struct Stream [src]
Fields
handle: HandleUnderlying platform-defined type which may or may not be
interchangeable with a file system file descriptor.
Members
Source
pub const Stream = struct {
/// Underlying platform-defined type which may or may not be
/// interchangeable with a file system file descriptor.
handle: Handle,
pub const Handle = switch (native_os) {
.windows => windows.ws2_32.SOCKET,
else => posix.fd_t,
};
pub fn close(s: Stream) void {
switch (native_os) {
.windows => windows.closesocket(s.handle) catch unreachable,
else => posix.close(s.handle),
}
}
pub const ReadError = posix.ReadError || error{
SocketNotBound,
MessageTooBig,
NetworkSubsystemFailed,
ConnectionResetByPeer,
SocketNotConnected,
};
pub const WriteError = posix.SendMsgError || error{
ConnectionResetByPeer,
SocketNotBound,
MessageTooBig,
NetworkSubsystemFailed,
SystemResources,
SocketNotConnected,
Unexpected,
};
pub const Reader = switch (native_os) {
.windows => struct {
/// Use `interface` for portable code.
interface_state: Io.Reader,
/// Use `getStream` for portable code.
net_stream: Stream,
/// Use `getError` for portable code.
error_state: ?Error,
pub const Error = ReadError;
pub fn getStream(r: *const Reader) Stream {
return r.net_stream;
}
pub fn getError(r: *const Reader) ?Error {
return r.error_state;
}
pub fn interface(r: *Reader) *Io.Reader {
return &r.interface_state;
}
pub fn init(net_stream: Stream, buffer: []u8) Reader {
return .{
.interface_state = .{
.vtable = &.{
.stream = stream,
.readVec = readVec,
},
.buffer = buffer,
.seek = 0,
.end = 0,
},
.net_stream = net_stream,
.error_state = null,
};
}
fn stream(io_r: *Io.Reader, io_w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize {
const dest = limit.slice(try io_w.writableSliceGreedy(1));
var bufs: [1][]u8 = .{dest};
const n = try readVec(io_r, &bufs);
io_w.advance(n);
return n;
}
fn readVec(io_r: *std.Io.Reader, data: [][]u8) Io.Reader.Error!usize {
const r: *Reader = @alignCast(@fieldParentPtr("interface_state", io_r));
var iovecs: [max_buffers_len]windows.ws2_32.WSABUF = undefined;
const bufs_n, const data_size = try io_r.writableVectorWsa(&iovecs, data);
const bufs = iovecs[0..bufs_n];
assert(bufs[0].len != 0);
const n = streamBufs(r, bufs) catch |err| {
r.error_state = err;
return error.ReadFailed;
};
if (n == 0) return error.EndOfStream;
if (n > data_size) {
io_r.end += n - data_size;
return data_size;
}
return n;
}
fn handleRecvError(winsock_error: windows.ws2_32.WinsockError) Error!void {
switch (winsock_error) {
.WSAECONNRESET => return error.ConnectionResetByPeer,
.WSAEFAULT => unreachable, // a pointer is not completely contained in user address space.
.WSAEINPROGRESS, .WSAEINTR => unreachable, // deprecated and removed in WSA 2.2
.WSAEINVAL => return error.SocketNotBound,
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup must be called before this function
.WSA_IO_PENDING => unreachable,
.WSA_OPERATION_ABORTED => unreachable, // not using overlapped I/O
else => |err| return windows.unexpectedWSAError(err),
}
}
fn streamBufs(r: *Reader, bufs: []windows.ws2_32.WSABUF) Error!u32 {
var flags: u32 = 0;
var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED);
var n: u32 = undefined;
if (windows.ws2_32.WSARecv(
r.net_stream.handle,
bufs.ptr,
@intCast(bufs.len),
&n,
&flags,
&overlapped,
null,
) == windows.ws2_32.SOCKET_ERROR) switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING => {
var result_flags: u32 = undefined;
if (windows.ws2_32.WSAGetOverlappedResult(
r.net_stream.handle,
&overlapped,
&n,
windows.TRUE,
&result_flags,
) == windows.FALSE) try handleRecvError(windows.ws2_32.WSAGetLastError());
},
else => |winsock_error| try handleRecvError(winsock_error),
};
return n;
}
},
else => struct {
/// Use `getStream`, `interface`, and `getError` for portable code.
file_reader: File.Reader,
pub const Error = ReadError;
pub fn interface(r: *Reader) *Io.Reader {
return &r.file_reader.interface;
}
pub fn init(net_stream: Stream, buffer: []u8) Reader {
return .{
.file_reader = .{
.interface = File.Reader.initInterface(buffer),
.file = .{ .handle = net_stream.handle },
.mode = .streaming,
.seek_err = error.Unseekable,
.size_err = error.Streaming,
},
};
}
pub fn getStream(r: *const Reader) Stream {
return .{ .handle = r.file_reader.file.handle };
}
pub fn getError(r: *const Reader) ?Error {
return r.file_reader.err;
}
},
};
pub const Writer = switch (native_os) {
.windows => struct {
/// This field is present on all systems.
interface: Io.Writer,
/// Use `getStream` for cross-platform support.
stream: Stream,
/// This field is present on all systems.
err: ?Error = null,
pub const Error = WriteError;
pub fn init(stream: Stream, buffer: []u8) Writer {
return .{
.stream = stream,
.interface = .{
.vtable = &.{ .drain = drain },
.buffer = buffer,
},
};
}
pub fn getStream(w: *const Writer) Stream {
return w.stream;
}
fn addWsaBuf(v: []windows.ws2_32.WSABUF, i: *u32, bytes: []const u8) void {
const cap = std.math.maxInt(u32);
var remaining = bytes;
while (remaining.len > cap) {
if (v.len - i.* == 0) return;
v[i.*] = .{ .buf = @constCast(remaining.ptr), .len = cap };
i.* += 1;
remaining = remaining[cap..];
} else {
@branchHint(.likely);
if (v.len - i.* == 0) return;
v[i.*] = .{ .buf = @constCast(remaining.ptr), .len = @intCast(remaining.len) };
i.* += 1;
}
}
fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const buffered = io_w.buffered();
comptime assert(native_os == .windows);
var iovecs: [max_buffers_len]windows.ws2_32.WSABUF = undefined;
var len: u32 = 0;
addWsaBuf(&iovecs, &len, buffered);
for (data[0 .. data.len - 1]) |bytes| addWsaBuf(&iovecs, &len, bytes);
const pattern = data[data.len - 1];
if (iovecs.len - len != 0) switch (splat) {
0 => {},
1 => addWsaBuf(&iovecs, &len, pattern),
else => switch (pattern.len) {
0 => {},
1 => {
const splat_buffer_candidate = io_w.buffer[io_w.end..];
var backup_buffer: [64]u8 = undefined;
const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len)
splat_buffer_candidate
else
&backup_buffer;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
addWsaBuf(&iovecs, &len, buf);
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and len < iovecs.len) {
addWsaBuf(&iovecs, &len, splat_buffer);
remaining_splat -= splat_buffer.len;
}
addWsaBuf(&iovecs, &len, splat_buffer[0..remaining_splat]);
},
else => for (0..@min(splat, iovecs.len - len)) |_| {
addWsaBuf(&iovecs, &len, pattern);
},
},
};
const n = sendBufs(w.stream.handle, iovecs[0..len]) catch |err| {
w.err = err;
return error.WriteFailed;
};
return io_w.consume(n);
}
fn handleSendError(winsock_error: windows.ws2_32.WinsockError) Error!void {
switch (winsock_error) {
.WSAECONNABORTED => return error.ConnectionResetByPeer,
.WSAECONNRESET => return error.ConnectionResetByPeer,
.WSAEFAULT => unreachable, // a pointer is not completely contained in user address space.
.WSAEINPROGRESS, .WSAEINTR => unreachable, // deprecated and removed in WSA 2.2
.WSAEINVAL => return error.SocketNotBound,
.WSAEMSGSIZE => return error.MessageTooBig,
.WSAENETDOWN => return error.NetworkSubsystemFailed,
.WSAENETRESET => return error.ConnectionResetByPeer,
.WSAENOBUFS => return error.SystemResources,
.WSAENOTCONN => return error.SocketNotConnected,
.WSAENOTSOCK => unreachable, // not a socket
.WSAEOPNOTSUPP => unreachable, // only for message-oriented sockets
.WSAESHUTDOWN => unreachable, // cannot send on a socket after write shutdown
.WSAEWOULDBLOCK => return error.WouldBlock,
.WSANOTINITIALISED => unreachable, // WSAStartup must be called before this function
.WSA_IO_PENDING => unreachable,
.WSA_OPERATION_ABORTED => unreachable, // not using overlapped I/O
else => |err| return windows.unexpectedWSAError(err),
}
}
fn sendBufs(handle: Stream.Handle, bufs: []windows.ws2_32.WSABUF) Error!u32 {
var n: u32 = undefined;
var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED);
if (windows.ws2_32.WSASend(
handle,
bufs.ptr,
@intCast(bufs.len),
&n,
0,
&overlapped,
null,
) == windows.ws2_32.SOCKET_ERROR) switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING => {
var result_flags: u32 = undefined;
if (windows.ws2_32.WSAGetOverlappedResult(
handle,
&overlapped,
&n,
windows.TRUE,
&result_flags,
) == windows.FALSE) try handleSendError(windows.ws2_32.WSAGetLastError());
},
else => |winsock_error| try handleSendError(winsock_error),
};
return n;
}
},
else => struct {
/// This field is present on all systems.
interface: Io.Writer,
err: ?Error = null,
file_writer: File.Writer,
pub const Error = WriteError;
pub fn init(stream: Stream, buffer: []u8) Writer {
return .{
.interface = .{
.vtable = &.{
.drain = drain,
.sendFile = sendFile,
},
.buffer = buffer,
},
.file_writer = .initStreaming(.{ .handle = stream.handle }, &.{}),
};
}
pub fn getStream(w: *const Writer) Stream {
return .{ .handle = w.file_writer.file.handle };
}
fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
// OS checks ptr addr before length so zero length vectors must be omitted.
if (bytes.len == 0) return;
if (v.len - i.* == 0) return;
v[i.*] = .{ .base = bytes.ptr, .len = bytes.len };
i.* += 1;
}
fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const buffered = io_w.buffered();
var iovecs: [max_buffers_len]posix.iovec_const = undefined;
var msg: posix.msghdr_const = .{
.name = null,
.namelen = 0,
.iov = &iovecs,
.iovlen = 0,
.control = null,
.controllen = 0,
.flags = 0,
};
addBuf(&iovecs, &msg.iovlen, buffered);
for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes);
const pattern = data[data.len - 1];
if (iovecs.len - msg.iovlen != 0) switch (splat) {
0 => {},
1 => addBuf(&iovecs, &msg.iovlen, pattern),
else => switch (pattern.len) {
0 => {},
1 => {
const splat_buffer_candidate = io_w.buffer[io_w.end..];
var backup_buffer: [64]u8 = undefined;
const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len)
splat_buffer_candidate
else
&backup_buffer;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
addBuf(&iovecs, &msg.iovlen, buf);
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
assert(buf.len == splat_buffer.len);
addBuf(&iovecs, &msg.iovlen, splat_buffer);
remaining_splat -= splat_buffer.len;
}
addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]);
},
else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| {
addBuf(&iovecs, &msg.iovlen, pattern);
},
},
};
const flags = posix.MSG.NOSIGNAL;
return io_w.consume(posix.sendmsg(w.file_writer.file.handle, &msg, flags) catch |err| {
w.err = err;
return error.WriteFailed;
});
}
fn sendFile(io_w: *Io.Writer, file_reader: *File.Reader, limit: Io.Limit) Io.Writer.FileError!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const n = try w.file_writer.interface.sendFileHeader(io_w.buffered(), file_reader, limit);
return io_w.consume(n);
}
},
};
pub fn reader(stream: Stream, buffer: []u8) Reader {
return .init(stream, buffer);
}
pub fn writer(stream: Stream, buffer: []u8) Writer {
return .init(stream, buffer);
}
const max_buffers_len = 8;
/// Deprecated in favor of `Reader`.
pub fn read(self: Stream, buffer: []u8) ReadError!usize {
if (native_os == .windows) {
return windows.ReadFile(self.handle, buffer, null);
}
return posix.read(self.handle, buffer);
}
/// Deprecated in favor of `Reader`.
pub fn readv(s: Stream, iovecs: []const posix.iovec) ReadError!usize {
if (native_os == .windows) {
if (iovecs.len == 0) return 0;
const first = iovecs[0];
return windows.ReadFile(s.handle, first.base[0..first.len], null);
}
return posix.readv(s.handle, iovecs);
}
/// Deprecated in favor of `Reader`.
pub fn readAtLeast(s: Stream, buffer: []u8, len: usize) ReadError!usize {
assert(len <= buffer.len);
var index: usize = 0;
while (index < len) {
const amt = try s.read(buffer[index..]);
if (amt == 0) break;
index += amt;
}
return index;
}
/// Deprecated in favor of `Writer`.
pub fn write(self: Stream, buffer: []const u8) WriteError!usize {
var stream_writer = self.writer(&.{});
return stream_writer.interface.writeVec(&.{buffer}) catch return stream_writer.err.?;
}
/// Deprecated in favor of `Writer`.
pub fn writeAll(self: Stream, bytes: []const u8) WriteError!void {
var index: usize = 0;
while (index < bytes.len) {
index += try self.write(bytes[index..]);
}
}
/// Deprecated in favor of `Writer`.
pub fn writev(self: Stream, iovecs: []const posix.iovec_const) WriteError!usize {
return @errorCast(posix.writev(self.handle, iovecs));
}
/// Deprecated in favor of `Writer`.
pub fn writevAll(self: Stream, iovecs: []posix.iovec_const) WriteError!void {
if (iovecs.len == 0) return;
var i: usize = 0;
while (true) {
var amt = try self.writev(iovecs[i..]);
while (amt >= iovecs[i].len) {
amt -= iovecs[i].len;
i += 1;
if (i >= iovecs.len) return;
}
iovecs[i].base += amt;
iovecs[i].len -= amt;
}
}
}