struct Reader [src]
Fields
in: *std.Io.Reader
interface: std.Io.ReaderThis is preallocated memory that might be used by bodyReader. That
function might return a pointer to this field, or a different
*std.Io.Reader. Advisable to not access this field directly.
state: StateKeeps track of whether the stream is ready to accept a new request,
making invalid API usage cause assertion failures rather than HTTP
protocol violations.
trailers: []const u8 = &.{}HTTP trailer bytes. These are at the end of a transfer-encoding:
chunked message. This data is available only after calling one of the
"end" functions and points to data inside the buffer of in, and is
therefore invalidated on the next call to receiveHead, or any other
read from in.
body_err: ?BodyError = null
max_head_len: usize
Members
- BodyError (Error Set)
- bodyReader (Function)
- bodyReaderDecompressing (Function)
- HeadError (Error Set)
- receiveHead (Function)
- RemainingChunkLen (enum)
- State (union)
Source
pub const Reader = struct {
in: *std.Io.Reader,
/// This is preallocated memory that might be used by `bodyReader`. That
/// function might return a pointer to this field, or a different
/// `*std.Io.Reader`. Advisable to not access this field directly.
interface: std.Io.Reader,
/// Keeps track of whether the stream is ready to accept a new request,
/// making invalid API usage cause assertion failures rather than HTTP
/// protocol violations.
state: State,
/// HTTP trailer bytes. These are at the end of a transfer-encoding:
/// chunked message. This data is available only after calling one of the
/// "end" functions and points to data inside the buffer of `in`, and is
/// therefore invalidated on the next call to `receiveHead`, or any other
/// read from `in`.
trailers: []const u8 = &.{},
body_err: ?BodyError = null,
max_head_len: usize,
pub const RemainingChunkLen = enum(u64) {
head = 0,
n = 1,
rn = 2,
_,
pub fn init(integer: u64) RemainingChunkLen {
return @enumFromInt(integer);
}
pub fn int(rcl: RemainingChunkLen) u64 {
return @intFromEnum(rcl);
}
};
pub const State = union(enum) {
/// The stream is available to be used for the first time, or reused.
ready,
received_head,
/// The stream goes until the connection is closed.
body_none,
body_remaining_content_length: u64,
body_remaining_chunk_len: RemainingChunkLen,
/// The stream would be eligible for another HTTP request, however the
/// client and server did not negotiate a persistent connection.
closing,
};
pub const BodyError = error{
HttpChunkInvalid,
HttpChunkTruncated,
HttpHeadersOversize,
};
pub const HeadError = error{
/// Too many bytes of HTTP headers.
///
/// The HTTP specification suggests to respond with a 431 status code
/// before closing the connection.
HttpHeadersOversize,
/// Partial HTTP request was received but the connection was closed
/// before fully receiving the headers.
HttpRequestTruncated,
/// The client sent 0 bytes of headers before closing the stream. This
/// happens when a keep-alive connection is finally closed.
HttpConnectionClosing,
/// Transitive error occurred reading from `in`.
ReadFailed,
};
/// Buffers the entire head inside `in`.
///
/// The resulting memory is invalidated by any subsequent consumption of
/// the input stream.
pub fn receiveHead(reader: *Reader) HeadError![]const u8 {
reader.trailers = &.{};
const in = reader.in;
const max_head_len = reader.max_head_len;
var hp: HeadParser = .{};
var head_len: usize = 0;
while (true) {
if (head_len >= max_head_len) return error.HttpHeadersOversize;
const remaining = in.buffered()[head_len..];
if (remaining.len == 0) {
in.fillMore() catch |err| switch (err) {
error.EndOfStream => switch (head_len) {
0 => return error.HttpConnectionClosing,
else => return error.HttpRequestTruncated,
},
error.ReadFailed => return error.ReadFailed,
};
continue;
}
head_len += hp.feed(remaining);
if (hp.state == .finished) {
reader.state = .received_head;
const head_buffer = in.buffered()[0..head_len];
in.toss(head_len);
return head_buffer;
}
}
}
/// If compressed body has been negotiated this will return compressed bytes.
///
/// Asserts only called once and after `receiveHead`.
///
/// See also:
/// * `interfaceDecompressing`
pub fn bodyReader(
reader: *Reader,
transfer_buffer: []u8,
transfer_encoding: TransferEncoding,
content_length: ?u64,
) *std.Io.Reader {
assert(reader.state == .received_head);
switch (transfer_encoding) {
.chunked => {
reader.state = .{ .body_remaining_chunk_len = .head };
reader.interface = .{
.buffer = transfer_buffer,
.seek = 0,
.end = 0,
.vtable = &.{
.stream = chunkedStream,
.discard = chunkedDiscard,
},
};
return &reader.interface;
},
.none => {
if (content_length) |len| {
reader.state = .{ .body_remaining_content_length = len };
reader.interface = .{
.buffer = transfer_buffer,
.seek = 0,
.end = 0,
.vtable = &.{
.stream = contentLengthStream,
.discard = contentLengthDiscard,
},
};
return &reader.interface;
} else {
reader.state = .body_none;
return reader.in;
}
},
}
}
/// If compressed body has been negotiated this will return decompressed bytes.
///
/// Asserts only called once and after `receiveHead`.
///
/// See also:
/// * `interface`
pub fn bodyReaderDecompressing(
reader: *Reader,
transfer_buffer: []u8,
transfer_encoding: TransferEncoding,
content_length: ?u64,
content_encoding: ContentEncoding,
decompress: *Decompress,
decompress_buffer: []u8,
) *std.Io.Reader {
if (transfer_encoding == .none and content_length == null) {
assert(reader.state == .received_head);
reader.state = .body_none;
switch (content_encoding) {
.identity => {
return reader.in;
},
.deflate => {
decompress.* = .{ .flate = .init(reader.in, .zlib, decompress_buffer) };
return &decompress.flate.reader;
},
.gzip => {
decompress.* = .{ .flate = .init(reader.in, .gzip, decompress_buffer) };
return &decompress.flate.reader;
},
.zstd => {
decompress.* = .{ .zstd = .init(reader.in, decompress_buffer, .{ .verify_checksum = false }) };
return &decompress.zstd.reader;
},
.compress => unreachable,
}
}
const transfer_reader = bodyReader(reader, transfer_buffer, transfer_encoding, content_length);
return decompress.init(transfer_reader, decompress_buffer, content_encoding);
}
fn contentLengthStream(
io_r: *std.Io.Reader,
w: *Writer,
limit: std.Io.Limit,
) std.Io.Reader.StreamError!usize {
const reader: *Reader = @alignCast(@fieldParentPtr("interface", io_r));
const remaining_content_length = &reader.state.body_remaining_content_length;
const remaining = remaining_content_length.*;
if (remaining == 0) {
reader.state = .ready;
return error.EndOfStream;
}
const n = try reader.in.stream(w, limit.min(.limited64(remaining)));
remaining_content_length.* = remaining - n;
return n;
}
fn contentLengthDiscard(io_r: *std.Io.Reader, limit: std.Io.Limit) std.Io.Reader.Error!usize {
const reader: *Reader = @alignCast(@fieldParentPtr("interface", io_r));
const remaining_content_length = &reader.state.body_remaining_content_length;
const remaining = remaining_content_length.*;
if (remaining == 0) {
reader.state = .ready;
return error.EndOfStream;
}
const n = try reader.in.discard(limit.min(.limited64(remaining)));
remaining_content_length.* = remaining - n;
return n;
}
fn chunkedStream(io_r: *std.Io.Reader, w: *Writer, limit: std.Io.Limit) std.Io.Reader.StreamError!usize {
const reader: *Reader = @alignCast(@fieldParentPtr("interface", io_r));
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
return chunkedReadEndless(reader, w, limit, chunk_len_ptr) catch |err| switch (err) {
error.ReadFailed => return error.ReadFailed,
error.WriteFailed => return error.WriteFailed,
error.EndOfStream => {
reader.body_err = error.HttpChunkTruncated;
return error.ReadFailed;
},
else => |e| {
reader.body_err = e;
return error.ReadFailed;
},
};
}
fn chunkedReadEndless(
reader: *Reader,
w: *Writer,
limit: std.Io.Limit,
chunk_len_ptr: *RemainingChunkLen,
) (BodyError || std.Io.Reader.StreamError)!usize {
const in = reader.in;
len: switch (chunk_len_ptr.*) {
.head => {
var cp: ChunkParser = .init;
while (true) {
const i = cp.feed(in.buffered());
switch (cp.state) {
.invalid => return error.HttpChunkInvalid,
.data => {
in.toss(i);
break;
},
else => {
in.toss(i);
try in.fillMore();
continue;
},
}
}
if (cp.chunk_len == 0) return parseTrailers(reader, 0);
const n = try in.stream(w, limit.min(.limited64(cp.chunk_len)));
chunk_len_ptr.* = .init(cp.chunk_len + 2 - n);
return n;
},
.n => {
if ((try in.peekByte()) != '\n') return error.HttpChunkInvalid;
in.toss(1);
continue :len .head;
},
.rn => {
const rn = try in.peekArray(2);
if (rn[0] != '\r' or rn[1] != '\n') return error.HttpChunkInvalid;
in.toss(2);
continue :len .head;
},
else => |remaining_chunk_len| {
const n = try in.stream(w, limit.min(.limited64(@intFromEnum(remaining_chunk_len) - 2)));
chunk_len_ptr.* = .init(@intFromEnum(remaining_chunk_len) - n);
return n;
},
}
}
fn chunkedDiscard(io_r: *std.Io.Reader, limit: std.Io.Limit) std.Io.Reader.Error!usize {
const reader: *Reader = @alignCast(@fieldParentPtr("interface", io_r));
const chunk_len_ptr = switch (reader.state) {
.ready => return error.EndOfStream,
.body_remaining_chunk_len => |*x| x,
else => unreachable,
};
return chunkedDiscardEndless(reader, limit, chunk_len_ptr) catch |err| switch (err) {
error.ReadFailed => return error.ReadFailed,
error.EndOfStream => {
reader.body_err = error.HttpChunkTruncated;
return error.ReadFailed;
},
else => |e| {
reader.body_err = e;
return error.ReadFailed;
},
};
}
fn chunkedDiscardEndless(
reader: *Reader,
limit: std.Io.Limit,
chunk_len_ptr: *RemainingChunkLen,
) (BodyError || std.Io.Reader.Error)!usize {
const in = reader.in;
len: switch (chunk_len_ptr.*) {
.head => {
var cp: ChunkParser = .init;
while (true) {
const i = cp.feed(in.buffered());
switch (cp.state) {
.invalid => return error.HttpChunkInvalid,
.data => {
in.toss(i);
break;
},
else => {
in.toss(i);
try in.fillMore();
continue;
},
}
}
if (cp.chunk_len == 0) return parseTrailers(reader, 0);
const n = try in.discard(limit.min(.limited64(cp.chunk_len)));
chunk_len_ptr.* = .init(cp.chunk_len + 2 - n);
return n;
},
.n => {
if ((try in.peekByte()) != '\n') return error.HttpChunkInvalid;
in.toss(1);
continue :len .head;
},
.rn => {
const rn = try in.peekArray(2);
if (rn[0] != '\r' or rn[1] != '\n') return error.HttpChunkInvalid;
in.toss(2);
continue :len .head;
},
else => |remaining_chunk_len| {
const n = try in.discard(limit.min(.limited64(remaining_chunk_len.int() - 2)));
chunk_len_ptr.* = .init(remaining_chunk_len.int() - n);
return n;
},
}
}
/// Called when next bytes in the stream are trailers, or "\r\n" to indicate
/// end of chunked body.
fn parseTrailers(reader: *Reader, amt_read: usize) (BodyError || std.Io.Reader.Error)!usize {
const in = reader.in;
const rn = try in.peekArray(2);
if (rn[0] == '\r' and rn[1] == '\n') {
in.toss(2);
reader.state = .ready;
assert(reader.trailers.len == 0);
return amt_read;
}
var hp: HeadParser = .{ .state = .seen_rn };
var trailers_len: usize = 2;
while (true) {
if (in.buffer.len - trailers_len == 0) return error.HttpHeadersOversize;
const remaining = in.buffered()[trailers_len..];
if (remaining.len == 0) {
try in.fillMore();
continue;
}
trailers_len += hp.feed(remaining);
if (hp.state == .finished) {
reader.state = .ready;
reader.trailers = in.buffered()[0..trailers_len];
in.toss(trailers_len);
return amt_read;
}
}
}
}