struct BufferGroup [src]

Group of application provided buffers. Uses newer type, called ring mapped buffers, supported since kernel 5.19. Buffers are identified by a buffer group ID, and within that group, a buffer ID. IO_Uring can have multiple buffer groups, each with unique group ID. In init application provides contiguous block of memory buffers for buffers_count buffers of size buffers_size. Application can then submit recv operation without providing buffer upfront. Once the operation is ready to receive data, a buffer is picked automatically and the resulting CQE will contain the buffer ID in cqe.buffer_id(). Use get method to get buffer for buffer ID identified by CQE. Once the application has processed the buffer, it may hand ownership back to the kernel, by calling put allowing the cycle to repeat. Depending on the rate of arrival of data, it is possible that a given buffer group will run out of buffers before those in CQEs can be put back to the kernel. If this happens, a cqe.err() will have ENOBUFS as the error value.

Fields

ring: *IoUringParent ring for which this group is registered.
br: *align(page_size_min) linux.io_uring_buf_ringPointer to the memory shared by the kernel. buffers_count of io_uring_buf structures are shared by the kernel. First io_uring_buf is overlaid by io_uring_buf_ring struct.
buffers: []u8Contiguous block of memory of size (buffers_count * buffer_size).
buffer_size: u32Size of each buffer in buffers.
buffers_count: u16Number of buffers in buffers, number of io_uring_buf structures in br.
heads: []u32Head of unconsumed part of each buffer, if incremental consumption is enabled
group_id: u16ID of this group, must be unique in ring.

Members

Source

pub const BufferGroup = struct { /// Parent ring for which this group is registered. ring: *IoUring, /// Pointer to the memory shared by the kernel. /// `buffers_count` of `io_uring_buf` structures are shared by the kernel. /// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct. br: *align(page_size_min) linux.io_uring_buf_ring, /// Contiguous block of memory of size (buffers_count * buffer_size). buffers: []u8, /// Size of each buffer in buffers. buffer_size: u32, /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br. buffers_count: u16, /// Head of unconsumed part of each buffer, if incremental consumption is enabled heads: []u32, /// ID of this group, must be unique in ring. group_id: u16, pub fn init( ring: *IoUring, allocator: mem.Allocator, group_id: u16, buffer_size: u32, buffers_count: u16, ) !BufferGroup { const buffers = try allocator.alloc(u8, buffer_size * buffers_count); errdefer allocator.free(buffers); const heads = try allocator.alloc(u32, buffers_count); errdefer allocator.free(heads); const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true }); buf_ring_init(br); const mask = buf_ring_mask(buffers_count); var i: u16 = 0; while (i < buffers_count) : (i += 1) { const pos = buffer_size * i; const buf = buffers[pos .. pos + buffer_size]; heads[i] = 0; buf_ring_add(br, buf, i, mask, i); } buf_ring_advance(br, buffers_count); return BufferGroup{ .ring = ring, .group_id = group_id, .br = br, .buffers = buffers, .heads = heads, .buffer_size = buffer_size, .buffers_count = buffers_count, }; } pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void { free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id); allocator.free(self.buffers); allocator.free(self.heads); } // Prepare recv operation which will select buffer from this group. pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe { var sqe = try self.ring.get_sqe(); sqe.prep_rw(.RECV, fd, 0, 0, 0); sqe.rw_flags = flags; sqe.flags |= linux.IOSQE_BUFFER_SELECT; sqe.buf_index = self.group_id; sqe.user_data = user_data; return sqe; } // Prepare multishot recv operation which will select buffer from this group. pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe { var sqe = try self.recv(user_data, fd, flags); sqe.ioprio |= linux.IORING_RECV_MULTISHOT; return sqe; } // Get buffer by id. fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 { const pos = self.buffer_size * buffer_id; return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..]; } // Get buffer by CQE. pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 { const buffer_id = try cqe.buffer_id(); const used_len = @as(usize, @intCast(cqe.res)); return self.get_by_id(buffer_id)[0..used_len]; } // Release buffer from CQE to the kernel. pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void { const buffer_id = try cqe.buffer_id(); if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) { // Incremental consumption active, kernel will write to the this buffer again const used_len = @as(u32, @intCast(cqe.res)); // Track what part of the buffer is used self.heads[buffer_id] += used_len; return; } self.heads[buffer_id] = 0; // Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count); const mask = buf_ring_mask(self.buffers_count); buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0); buf_ring_advance(self.br, 1); } }