struct BufferGroup [src]
Group of application provided buffers. Uses newer type, called ring mapped
buffers, supported since kernel 5.19. Buffers are identified by a buffer
group ID, and within that group, a buffer ID. IO_Uring can have multiple
buffer groups, each with unique group ID.
In init application provides contiguous block of memory buffers for
buffers_count buffers of size buffers_size. Application can then submit
recv operation without providing buffer upfront. Once the operation is
ready to receive data, a buffer is picked automatically and the resulting
CQE will contain the buffer ID in cqe.buffer_id(). Use get method to get
buffer for buffer ID identified by CQE. Once the application has processed
the buffer, it may hand ownership back to the kernel, by calling put
allowing the cycle to repeat.
Depending on the rate of arrival of data, it is possible that a given buffer
group will run out of buffers before those in CQEs can be put back to the
kernel. If this happens, a cqe.err() will have ENOBUFS as the error value.
Fields
ring: *IoUringParent ring for which this group is registered.
br: *align(page_size_min) linux.io_uring_buf_ringPointer to the memory shared by the kernel.
buffers_count of io_uring_buf structures are shared by the kernel.
First io_uring_buf is overlaid by io_uring_buf_ring struct.
buffers: []u8Contiguous block of memory of size (buffers_count * buffer_size).
buffer_size: u32Size of each buffer in buffers.
buffers_count: u16Number of buffers in buffers, number of io_uring_buf structures in br.
heads: []u32Head of unconsumed part of each buffer, if incremental consumption is enabled
group_id: u16ID of this group, must be unique in ring.
Members
Source
pub const BufferGroup = struct {
/// Parent ring for which this group is registered.
ring: *IoUring,
/// Pointer to the memory shared by the kernel.
/// `buffers_count` of `io_uring_buf` structures are shared by the kernel.
/// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct.
br: *align(page_size_min) linux.io_uring_buf_ring,
/// Contiguous block of memory of size (buffers_count * buffer_size).
buffers: []u8,
/// Size of each buffer in buffers.
buffer_size: u32,
/// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
buffers_count: u16,
/// Head of unconsumed part of each buffer, if incremental consumption is enabled
heads: []u32,
/// ID of this group, must be unique in ring.
group_id: u16,
pub fn init(
ring: *IoUring,
allocator: mem.Allocator,
group_id: u16,
buffer_size: u32,
buffers_count: u16,
) !BufferGroup {
const buffers = try allocator.alloc(u8, buffer_size * buffers_count);
errdefer allocator.free(buffers);
const heads = try allocator.alloc(u32, buffers_count);
errdefer allocator.free(heads);
const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true });
buf_ring_init(br);
const mask = buf_ring_mask(buffers_count);
var i: u16 = 0;
while (i < buffers_count) : (i += 1) {
const pos = buffer_size * i;
const buf = buffers[pos .. pos + buffer_size];
heads[i] = 0;
buf_ring_add(br, buf, i, mask, i);
}
buf_ring_advance(br, buffers_count);
return BufferGroup{
.ring = ring,
.group_id = group_id,
.br = br,
.buffers = buffers,
.heads = heads,
.buffer_size = buffer_size,
.buffers_count = buffers_count,
};
}
pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void {
free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
allocator.free(self.buffers);
allocator.free(self.heads);
}
// Prepare recv operation which will select buffer from this group.
pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.ring.get_sqe();
sqe.prep_rw(.RECV, fd, 0, 0, 0);
sqe.rw_flags = flags;
sqe.flags |= linux.IOSQE_BUFFER_SELECT;
sqe.buf_index = self.group_id;
sqe.user_data = user_data;
return sqe;
}
// Prepare multishot recv operation which will select buffer from this group.
pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.recv(user_data, fd, flags);
sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
return sqe;
}
// Get buffer by id.
fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 {
const pos = self.buffer_size * buffer_id;
return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..];
}
// Get buffer by CQE.
pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
const buffer_id = try cqe.buffer_id();
const used_len = @as(usize, @intCast(cqe.res));
return self.get_by_id(buffer_id)[0..used_len];
}
// Release buffer from CQE to the kernel.
pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
const buffer_id = try cqe.buffer_id();
if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) {
// Incremental consumption active, kernel will write to the this buffer again
const used_len = @as(u32, @intCast(cqe.res));
// Track what part of the buffer is used
self.heads[buffer_id] += used_len;
return;
}
self.heads[buffer_id] = 0;
// Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count);
const mask = buf_ring_mask(self.buffers_count);
buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0);
buf_ring_advance(self.br, 1);
}
}