struct ConnectionPool [src]
A set of linked lists of connections that can be reused.
Fields
mutex: std.Thread.Mutex = .{}
used: Queue = .{}Open connections that are currently in use.
free: Queue = .{}Open connections that are not currently in use.
free_len: usize = 0
free_size: usize = 32
Members
- acquire (Function)
- acquireUnsafe (Function)
- addUsed (Function)
- Criteria (struct)
- deinit (Function)
- findConnection (Function)
- Node (Constant)
- release (Function)
- resize (Function)
Source
pub const ConnectionPool = struct {
mutex: std.Thread.Mutex = .{},
/// Open connections that are currently in use.
used: Queue = .{},
/// Open connections that are not currently in use.
free: Queue = .{},
free_len: usize = 0,
free_size: usize = 32,
/// The criteria for a connection to be considered a match.
pub const Criteria = struct {
host: []const u8,
port: u16,
protocol: Connection.Protocol,
};
const Queue = std.DoublyLinkedList(Connection);
pub const Node = Queue.Node;
/// Finds and acquires a connection from the connection pool matching the criteria. This function is threadsafe.
/// If no connection is found, null is returned.
pub fn findConnection(pool: *ConnectionPool, criteria: Criteria) ?*Connection {
pool.mutex.lock();
defer pool.mutex.unlock();
var next = pool.free.last;
while (next) |node| : (next = node.prev) {
if (node.data.protocol != criteria.protocol) continue;
if (node.data.port != criteria.port) continue;
// Domain names are case-insensitive (RFC 5890, Section 2.3.2.4)
if (!std.ascii.eqlIgnoreCase(node.data.host, criteria.host)) continue;
pool.acquireUnsafe(node);
return &node.data;
}
return null;
}
/// Acquires an existing connection from the connection pool. This function is not threadsafe.
pub fn acquireUnsafe(pool: *ConnectionPool, node: *Node) void {
pool.free.remove(node);
pool.free_len -= 1;
pool.used.append(node);
}
/// Acquires an existing connection from the connection pool. This function is threadsafe.
pub fn acquire(pool: *ConnectionPool, node: *Node) void {
pool.mutex.lock();
defer pool.mutex.unlock();
return pool.acquireUnsafe(node);
}
/// Tries to release a connection back to the connection pool. This function is threadsafe.
/// If the connection is marked as closing, it will be closed instead.
///
/// The allocator must be the owner of all nodes in this pool.
/// The allocator must be the owner of all resources associated with the connection.
pub fn release(pool: *ConnectionPool, allocator: Allocator, connection: *Connection) void {
pool.mutex.lock();
defer pool.mutex.unlock();
const node: *Node = @fieldParentPtr("data", connection);
pool.used.remove(node);
if (node.data.closing or pool.free_size == 0) {
node.data.close(allocator);
return allocator.destroy(node);
}
if (pool.free_len >= pool.free_size) {
const popped = pool.free.popFirst() orelse unreachable;
pool.free_len -= 1;
popped.data.close(allocator);
allocator.destroy(popped);
}
if (node.data.proxied) {
pool.free.prepend(node); // proxied connections go to the end of the queue, always try direct connections first
} else {
pool.free.append(node);
}
pool.free_len += 1;
}
/// Adds a newly created node to the pool of used connections. This function is threadsafe.
pub fn addUsed(pool: *ConnectionPool, node: *Node) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pool.used.append(node);
}
/// Resizes the connection pool. This function is threadsafe.
///
/// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size.
pub fn resize(pool: *ConnectionPool, allocator: Allocator, new_size: usize) void {
pool.mutex.lock();
defer pool.mutex.unlock();
const next = pool.free.first;
_ = next;
while (pool.free_len > new_size) {
const popped = pool.free.popFirst() orelse unreachable;
pool.free_len -= 1;
popped.data.close(allocator);
allocator.destroy(popped);
}
pool.free_size = new_size;
}
/// Frees the connection pool and closes all connections within. This function is threadsafe.
///
/// All future operations on the connection pool will deadlock.
pub fn deinit(pool: *ConnectionPool, allocator: Allocator) void {
pool.mutex.lock();
var next = pool.free.first;
while (next) |node| {
defer allocator.destroy(node);
next = node.next;
node.data.close(allocator);
}
next = pool.used.first;
while (next) |node| {
defer allocator.destroy(node);
next = node.next;
node.data.close(allocator);
}
pool.* = undefined;
}
}