struct Writer [src]
Fields
file: File
err: ?WriteError = null
mode: Writer.Mode = .positional
pos: u64 = 0Tracks the true seek position in the file. To obtain the logical
position, add the buffer size to this value.
sendfile_err: ?SendfileError = null
copy_file_range_err: ?CopyFileRangeError = null
fcopyfile_err: ?FcopyfileError = null
seek_err: ?Writer.SeekError = null
interface: std.Io.Writer
Members
- CopyFileRangeError (Error Set)
- drain (Function)
- end (Function)
- EndError (Error Set)
- FcopyfileError (Error Set)
- init (Function)
- initInterface (Function)
- initStreaming (Function)
- Mode (enum)
- moveToReader (Function)
- SeekError (Error Set)
- seekTo (Function)
- seekToUnbuffered (Function)
- sendFile (Function)
- SendfileError (Error Set)
Source
pub const Writer = struct {
file: File,
err: ?WriteError = null,
mode: Writer.Mode = .positional,
/// Tracks the true seek position in the file. To obtain the logical
/// position, add the buffer size to this value.
pos: u64 = 0,
sendfile_err: ?SendfileError = null,
copy_file_range_err: ?CopyFileRangeError = null,
fcopyfile_err: ?FcopyfileError = null,
seek_err: ?Writer.SeekError = null,
interface: std.Io.Writer,
pub const Mode = Reader.Mode;
pub const SendfileError = error{
UnsupportedOperation,
SystemResources,
InputOutput,
BrokenPipe,
WouldBlock,
Unexpected,
};
pub const CopyFileRangeError = std.os.freebsd.CopyFileRangeError || std.os.linux.wrapped.CopyFileRangeError;
pub const FcopyfileError = error{
OperationNotSupported,
OutOfMemory,
Unexpected,
};
pub const SeekError = File.SeekError;
/// Number of slices to store on the stack, when trying to send as many byte
/// vectors through the underlying write calls as possible.
const max_buffers_len = 16;
pub fn init(file: File, buffer: []u8) Writer {
return .{
.file = file,
.interface = initInterface(buffer),
.mode = .positional,
};
}
/// Positional is more threadsafe, since the global seek position is not
/// affected, but when such syscalls are not available, preemptively
/// initializing in streaming mode will skip a failed syscall.
pub fn initStreaming(file: File, buffer: []u8) Writer {
return .{
.file = file,
.interface = initInterface(buffer),
.mode = .streaming,
};
}
pub fn initInterface(buffer: []u8) std.Io.Writer {
return .{
.vtable = &.{
.drain = drain,
.sendFile = switch (builtin.zig_backend) {
else => sendFile,
.stage2_aarch64 => std.Io.Writer.unimplementedSendFile,
},
},
.buffer = buffer,
};
}
pub fn moveToReader(w: *Writer) Reader {
defer w.* = undefined;
return .{
.file = w.file,
.mode = w.mode,
.pos = w.pos,
.interface = Reader.initInterface(w.interface.buffer),
.seek_err = w.seek_err,
};
}
pub fn drain(io_w: *std.Io.Writer, data: []const []const u8, splat: usize) std.Io.Writer.Error!usize {
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const handle = w.file.handle;
const buffered = io_w.buffered();
if (is_windows) switch (w.mode) {
.positional, .positional_reading => {
if (buffered.len != 0) {
const n = windows.WriteFile(handle, buffered, w.pos) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
}
for (data[0 .. data.len - 1]) |buf| {
if (buf.len == 0) continue;
const n = windows.WriteFile(handle, buf, w.pos) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
}
const pattern = data[data.len - 1];
if (pattern.len == 0 or splat == 0) return 0;
const n = windows.WriteFile(handle, pattern, w.pos) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
},
.streaming, .streaming_reading => {
if (buffered.len != 0) {
const n = windows.WriteFile(handle, buffered, null) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
}
for (data[0 .. data.len - 1]) |buf| {
if (buf.len == 0) continue;
const n = windows.WriteFile(handle, buf, null) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
}
const pattern = data[data.len - 1];
if (pattern.len == 0 or splat == 0) return 0;
const n = windows.WriteFile(handle, pattern, null) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
},
.failure => return error.WriteFailed,
};
var iovecs: [max_buffers_len]std.posix.iovec_const = undefined;
var len: usize = 0;
if (buffered.len > 0) {
iovecs[len] = .{ .base = buffered.ptr, .len = buffered.len };
len += 1;
}
for (data[0 .. data.len - 1]) |d| {
if (d.len == 0) continue;
iovecs[len] = .{ .base = d.ptr, .len = d.len };
len += 1;
if (iovecs.len - len == 0) break;
}
const pattern = data[data.len - 1];
if (iovecs.len - len != 0) switch (splat) {
0 => {},
1 => if (pattern.len != 0) {
iovecs[len] = .{ .base = pattern.ptr, .len = pattern.len };
len += 1;
},
else => switch (pattern.len) {
0 => {},
1 => {
const splat_buffer_candidate = io_w.buffer[io_w.end..];
var backup_buffer: [64]u8 = undefined;
const splat_buffer = if (splat_buffer_candidate.len >= backup_buffer.len)
splat_buffer_candidate
else
&backup_buffer;
const memset_len = @min(splat_buffer.len, splat);
const buf = splat_buffer[0..memset_len];
@memset(buf, pattern[0]);
iovecs[len] = .{ .base = buf.ptr, .len = buf.len };
len += 1;
var remaining_splat = splat - buf.len;
while (remaining_splat > splat_buffer.len and iovecs.len - len != 0) {
assert(buf.len == splat_buffer.len);
iovecs[len] = .{ .base = splat_buffer.ptr, .len = splat_buffer.len };
len += 1;
remaining_splat -= splat_buffer.len;
}
if (remaining_splat > 0 and iovecs.len - len != 0) {
iovecs[len] = .{ .base = splat_buffer.ptr, .len = remaining_splat };
len += 1;
}
},
else => for (0..splat) |_| {
iovecs[len] = .{ .base = pattern.ptr, .len = pattern.len };
len += 1;
if (iovecs.len - len == 0) break;
},
},
};
if (len == 0) return 0;
switch (w.mode) {
.positional, .positional_reading => {
const n = std.posix.pwritev(handle, iovecs[0..len], w.pos) catch |err| switch (err) {
error.Unseekable => {
w.mode = w.mode.toStreaming();
const pos = w.pos;
if (pos != 0) {
w.pos = 0;
w.seekTo(@intCast(pos)) catch {
w.mode = .failure;
return error.WriteFailed;
};
}
return 0;
},
else => |e| {
w.err = e;
return error.WriteFailed;
},
};
w.pos += n;
return io_w.consume(n);
},
.streaming, .streaming_reading => {
const n = std.posix.writev(handle, iovecs[0..len]) catch |err| {
w.err = err;
return error.WriteFailed;
};
w.pos += n;
return io_w.consume(n);
},
.failure => return error.WriteFailed,
}
}
pub fn sendFile(
io_w: *std.Io.Writer,
file_reader: *Reader,
limit: std.Io.Limit,
) std.Io.Writer.FileError!usize {
const reader_buffered = file_reader.interface.buffered();
if (reader_buffered.len >= @intFromEnum(limit))
return sendFileBuffered(io_w, file_reader, reader_buffered);
const writer_buffered = io_w.buffered();
const file_limit = @intFromEnum(limit) - reader_buffered.len;
const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
const out_fd = w.file.handle;
const in_fd = file_reader.file.handle;
if (file_reader.size) |size| {
if (size - file_reader.pos == 0) {
if (reader_buffered.len != 0) {
return sendFileBuffered(io_w, file_reader, reader_buffered);
} else {
return error.EndOfStream;
}
}
}
if (native_os == .freebsd and w.mode == .streaming) sf: {
// Try using sendfile on FreeBSD.
if (w.sendfile_err != null) break :sf;
const offset = std.math.cast(std.c.off_t, file_reader.pos) orelse break :sf;
var hdtr_data: std.c.sf_hdtr = undefined;
var headers: [2]posix.iovec_const = undefined;
var headers_i: u8 = 0;
if (writer_buffered.len != 0) {
headers[headers_i] = .{ .base = writer_buffered.ptr, .len = writer_buffered.len };
headers_i += 1;
}
if (reader_buffered.len != 0) {
headers[headers_i] = .{ .base = reader_buffered.ptr, .len = reader_buffered.len };
headers_i += 1;
}
const hdtr: ?*std.c.sf_hdtr = if (headers_i == 0) null else b: {
hdtr_data = .{
.headers = &headers,
.hdr_cnt = headers_i,
.trailers = null,
.trl_cnt = 0,
};
break :b &hdtr_data;
};
var sbytes: std.c.off_t = undefined;
const nbytes: usize = @min(file_limit, maxInt(usize));
const flags = 0;
switch (posix.errno(std.c.sendfile(in_fd, out_fd, offset, nbytes, hdtr, &sbytes, flags))) {
.SUCCESS, .INTR => {},
.INVAL, .OPNOTSUPP, .NOTSOCK, .NOSYS => w.sendfile_err = error.UnsupportedOperation,
.BADF => if (builtin.mode == .Debug) @panic("race condition") else {
w.sendfile_err = error.Unexpected;
},
.FAULT => if (builtin.mode == .Debug) @panic("segmentation fault") else {
w.sendfile_err = error.Unexpected;
},
.NOTCONN => w.sendfile_err = error.BrokenPipe,
.AGAIN, .BUSY => if (sbytes == 0) {
w.sendfile_err = error.WouldBlock;
},
.IO => w.sendfile_err = error.InputOutput,
.PIPE => w.sendfile_err = error.BrokenPipe,
.NOBUFS => w.sendfile_err = error.SystemResources,
else => |err| w.sendfile_err = posix.unexpectedErrno(err),
}
if (w.sendfile_err != null) {
// Give calling code chance to observe the error before trying
// something else.
return 0;
}
if (sbytes == 0) {
file_reader.size = file_reader.pos;
return error.EndOfStream;
}
const consumed = io_w.consume(@intCast(sbytes));
file_reader.seekTo(file_reader.pos + consumed) catch return error.ReadFailed;
return consumed;
}
if (native_os.isDarwin() and w.mode == .streaming) sf: {
// Try using sendfile on macOS.
if (w.sendfile_err != null) break :sf;
const offset = std.math.cast(std.c.off_t, file_reader.pos) orelse break :sf;
var hdtr_data: std.c.sf_hdtr = undefined;
var headers: [2]posix.iovec_const = undefined;
var headers_i: u8 = 0;
if (writer_buffered.len != 0) {
headers[headers_i] = .{ .base = writer_buffered.ptr, .len = writer_buffered.len };
headers_i += 1;
}
if (reader_buffered.len != 0) {
headers[headers_i] = .{ .base = reader_buffered.ptr, .len = reader_buffered.len };
headers_i += 1;
}
const hdtr: ?*std.c.sf_hdtr = if (headers_i == 0) null else b: {
hdtr_data = .{
.headers = &headers,
.hdr_cnt = headers_i,
.trailers = null,
.trl_cnt = 0,
};
break :b &hdtr_data;
};
const max_count = maxInt(i32); // Avoid EINVAL.
var len: std.c.off_t = @min(file_limit, max_count);
const flags = 0;
switch (posix.errno(std.c.sendfile(in_fd, out_fd, offset, &len, hdtr, flags))) {
.SUCCESS, .INTR => {},
.OPNOTSUPP, .NOTSOCK, .NOSYS => w.sendfile_err = error.UnsupportedOperation,
.BADF => if (builtin.mode == .Debug) @panic("race condition") else {
w.sendfile_err = error.Unexpected;
},
.FAULT => if (builtin.mode == .Debug) @panic("segmentation fault") else {
w.sendfile_err = error.Unexpected;
},
.INVAL => if (builtin.mode == .Debug) @panic("invalid API usage") else {
w.sendfile_err = error.Unexpected;
},
.NOTCONN => w.sendfile_err = error.BrokenPipe,
.AGAIN => if (len == 0) {
w.sendfile_err = error.WouldBlock;
},
.IO => w.sendfile_err = error.InputOutput,
.PIPE => w.sendfile_err = error.BrokenPipe,
else => |err| w.sendfile_err = posix.unexpectedErrno(err),
}
if (w.sendfile_err != null) {
// Give calling code chance to observe the error before trying
// something else.
return 0;
}
if (len == 0) {
file_reader.size = file_reader.pos;
return error.EndOfStream;
}
const consumed = io_w.consume(@bitCast(len));
file_reader.seekTo(file_reader.pos + consumed) catch return error.ReadFailed;
return consumed;
}
if (native_os == .linux and w.mode == .streaming) sf: {
// Try using sendfile on Linux.
if (w.sendfile_err != null) break :sf;
// Linux sendfile does not support headers.
if (writer_buffered.len != 0 or reader_buffered.len != 0)
return sendFileBuffered(io_w, file_reader, reader_buffered);
const max_count = 0x7ffff000; // Avoid EINVAL.
var off: std.os.linux.off_t = undefined;
const off_ptr: ?*std.os.linux.off_t, const count: usize = switch (file_reader.mode) {
.positional => o: {
const size = file_reader.getSize() catch return 0;
off = std.math.cast(std.os.linux.off_t, file_reader.pos) orelse return error.ReadFailed;
break :o .{ &off, @min(@intFromEnum(limit), size - file_reader.pos, max_count) };
},
.streaming => .{ null, limit.minInt(max_count) },
.streaming_reading, .positional_reading => break :sf,
.failure => return error.ReadFailed,
};
const n = std.os.linux.wrapped.sendfile(out_fd, in_fd, off_ptr, count) catch |err| switch (err) {
error.Unseekable => {
file_reader.mode = file_reader.mode.toStreaming();
const pos = file_reader.pos;
if (pos != 0) {
file_reader.pos = 0;
file_reader.seekBy(@intCast(pos)) catch {
file_reader.mode = .failure;
return error.ReadFailed;
};
}
return 0;
},
else => |e| {
w.sendfile_err = e;
return 0;
},
};
if (n == 0) {
file_reader.size = file_reader.pos;
return error.EndOfStream;
}
file_reader.pos += n;
w.pos += n;
return n;
}
const copy_file_range = switch (native_os) {
.freebsd => std.os.freebsd.copy_file_range,
.linux => if (std.c.versionCheck(if (builtin.abi.isAndroid()) .{ .major = 34, .minor = 0, .patch = 0 } else .{ .major = 2, .minor = 27, .patch = 0 })) std.os.linux.wrapped.copy_file_range else {},
else => {},
};
if (@TypeOf(copy_file_range) != void) cfr: {
if (w.copy_file_range_err != null) break :cfr;
if (writer_buffered.len != 0 or reader_buffered.len != 0)
return sendFileBuffered(io_w, file_reader, reader_buffered);
var off_in: i64 = undefined;
var off_out: i64 = undefined;
const off_in_ptr: ?*i64 = switch (file_reader.mode) {
.positional_reading, .streaming_reading => return error.Unimplemented,
.positional => p: {
off_in = @intCast(file_reader.pos);
break :p &off_in;
},
.streaming => null,
.failure => return error.WriteFailed,
};
const off_out_ptr: ?*i64 = switch (w.mode) {
.positional_reading, .streaming_reading => return error.Unimplemented,
.positional => p: {
off_out = @intCast(w.pos);
break :p &off_out;
},
.streaming => null,
.failure => return error.WriteFailed,
};
const n = copy_file_range(in_fd, off_in_ptr, out_fd, off_out_ptr, @intFromEnum(limit), 0) catch |err| {
w.copy_file_range_err = err;
return 0;
};
if (n == 0) {
file_reader.size = file_reader.pos;
return error.EndOfStream;
}
file_reader.pos += n;
w.pos += n;
return n;
}
if (builtin.os.tag.isDarwin()) fcf: {
if (w.fcopyfile_err != null) break :fcf;
if (file_reader.pos != 0) break :fcf;
if (w.pos != 0) break :fcf;
if (limit != .unlimited) break :fcf;
const size = file_reader.getSize() catch break :fcf;
if (writer_buffered.len != 0 or reader_buffered.len != 0)
return sendFileBuffered(io_w, file_reader, reader_buffered);
const rc = std.c.fcopyfile(in_fd, out_fd, null, .{ .DATA = true });
switch (posix.errno(rc)) {
.SUCCESS => {},
.INVAL => if (builtin.mode == .Debug) @panic("invalid API usage") else {
w.fcopyfile_err = error.Unexpected;
return 0;
},
.NOMEM => {
w.fcopyfile_err = error.OutOfMemory;
return 0;
},
.OPNOTSUPP => {
w.fcopyfile_err = error.OperationNotSupported;
return 0;
},
else => |err| {
w.fcopyfile_err = posix.unexpectedErrno(err);
return 0;
},
}
file_reader.pos = size;
w.pos = size;
return size;
}
return error.Unimplemented;
}
fn sendFileBuffered(
io_w: *std.Io.Writer,
file_reader: *Reader,
reader_buffered: []const u8,
) std.Io.Writer.FileError!usize {
const n = try drain(io_w, &.{reader_buffered}, 1);
file_reader.seekTo(file_reader.pos + n) catch return error.ReadFailed;
return n;
}
pub fn seekTo(w: *Writer, offset: u64) (Writer.SeekError || std.Io.Writer.Error)!void {
try w.interface.flush();
try seekToUnbuffered(w, offset);
}
/// Asserts that no data is currently buffered.
pub fn seekToUnbuffered(w: *Writer, offset: u64) Writer.SeekError!void {
assert(w.interface.buffered().len == 0);
switch (w.mode) {
.positional, .positional_reading => {
w.pos = offset;
},
.streaming, .streaming_reading => {
if (w.seek_err) |err| return err;
posix.lseek_SET(w.file.handle, offset) catch |err| {
w.seek_err = err;
return err;
};
w.pos = offset;
},
.failure => return w.seek_err.?,
}
}
pub const EndError = SetEndPosError || std.Io.Writer.Error;
/// Flushes any buffered data and sets the end position of the file.
///
/// If not overwriting existing contents, then calling `interface.flush`
/// directly is sufficient.
///
/// Flush failure is handled by setting `err` so that it can be handled
/// along with other write failures.
pub fn end(w: *Writer) EndError!void {
try w.interface.flush();
switch (w.mode) {
.positional,
.positional_reading,
=> w.file.setEndPos(w.pos) catch |err| switch (err) {
error.NonResizable => return,
else => |e| return e,
},
.streaming,
.streaming_reading,
.failure,
=> {},
}
}
}