struct ConnectionPool [src]
A Least-Recently-Used cache of open connections to be reused.
Fields
mutex: std.Thread.Mutex = .{}
used: std.DoublyLinkedList = .{}Open connections that are currently in use.
free: std.DoublyLinkedList = .{}Open connections that are not currently in use.
free_len: usize = 0
free_size: usize = 32
Members
- acquire (Function)
- acquireUnsafe (Function)
- addUsed (Function)
- Criteria (struct)
- deinit (Function)
- findConnection (Function)
- release (Function)
- resize (Function)
Source
pub const ConnectionPool = struct {
mutex: std.Thread.Mutex = .{},
/// Open connections that are currently in use.
used: std.DoublyLinkedList = .{},
/// Open connections that are not currently in use.
free: std.DoublyLinkedList = .{},
free_len: usize = 0,
free_size: usize = 32,
/// The criteria for a connection to be considered a match.
pub const Criteria = struct {
host: []const u8,
port: u16,
protocol: Protocol,
};
/// Finds and acquires a connection from the connection pool matching the criteria.
/// If no connection is found, null is returned.
///
/// Threadsafe.
pub fn findConnection(pool: *ConnectionPool, criteria: Criteria) ?*Connection {
pool.mutex.lock();
defer pool.mutex.unlock();
var next = pool.free.last;
while (next) |node| : (next = node.prev) {
const connection: *Connection = @alignCast(@fieldParentPtr("pool_node", node));
if (connection.protocol != criteria.protocol) continue;
if (connection.port != criteria.port) continue;
// Domain names are case-insensitive (RFC 5890, Section 2.3.2.4)
if (!std.ascii.eqlIgnoreCase(connection.host(), criteria.host)) continue;
pool.acquireUnsafe(connection);
return connection;
}
return null;
}
/// Acquires an existing connection from the connection pool. This function is not threadsafe.
pub fn acquireUnsafe(pool: *ConnectionPool, connection: *Connection) void {
pool.free.remove(&connection.pool_node);
pool.free_len -= 1;
pool.used.append(&connection.pool_node);
}
/// Acquires an existing connection from the connection pool. This function is threadsafe.
pub fn acquire(pool: *ConnectionPool, connection: *Connection) void {
pool.mutex.lock();
defer pool.mutex.unlock();
return pool.acquireUnsafe(connection);
}
/// Tries to release a connection back to the connection pool.
/// If the connection is marked as closing, it will be closed instead.
///
/// Threadsafe.
pub fn release(pool: *ConnectionPool, connection: *Connection) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pool.used.remove(&connection.pool_node);
if (connection.closing or pool.free_size == 0) return connection.destroy();
if (pool.free_len >= pool.free_size) {
const popped: *Connection = @alignCast(@fieldParentPtr("pool_node", pool.free.popFirst().?));
pool.free_len -= 1;
popped.destroy();
}
if (connection.proxied) {
// proxied connections go to the end of the queue, always try direct connections first
pool.free.prepend(&connection.pool_node);
} else {
pool.free.append(&connection.pool_node);
}
pool.free_len += 1;
}
/// Adds a newly created node to the pool of used connections. This function is threadsafe.
pub fn addUsed(pool: *ConnectionPool, connection: *Connection) void {
pool.mutex.lock();
defer pool.mutex.unlock();
pool.used.append(&connection.pool_node);
}
/// Resizes the connection pool.
///
/// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size.
///
/// Threadsafe.
pub fn resize(pool: *ConnectionPool, allocator: Allocator, new_size: usize) void {
pool.mutex.lock();
defer pool.mutex.unlock();
const next = pool.free.first;
_ = next;
while (pool.free_len > new_size) {
const popped = pool.free.popFirst() orelse unreachable;
pool.free_len -= 1;
popped.data.close(allocator);
allocator.destroy(popped);
}
pool.free_size = new_size;
}
/// Frees the connection pool and closes all connections within.
///
/// All future operations on the connection pool will deadlock.
///
/// Threadsafe.
pub fn deinit(pool: *ConnectionPool) void {
pool.mutex.lock();
var next = pool.free.first;
while (next) |node| {
const connection: *Connection = @alignCast(@fieldParentPtr("pool_node", node));
next = node.next;
connection.destroy();
}
next = pool.used.first;
while (next) |node| {
const connection: *Connection = @alignCast(@fieldParentPtr("pool_node", node));
next = node.next;
connection.destroy();
}
pool.* = undefined;
}
}