move datastructures to dedicated "datastruct" package

This commit is contained in:
Mitchell Hashimoto
2024-11-07 14:38:54 -08:00
parent aed51fd0b0
commit a436bd0af6
18 changed files with 29 additions and 21 deletions

View File

@@ -0,0 +1,250 @@
//! Blocking queue implementation aimed primarily for message passing
//! between threads.
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
/// Returns a blocking queue implementation for type T.
///
/// This is tailor made for ghostty usage so it isn't meant to be maximally
/// generic, but I'm happy to make it more generic over time. Traits of this
/// queue that are specific to our usage:
///
/// - Fixed size. We expect our queue to quickly drain and also not be
/// too large so we prefer a fixed size queue for now.
/// - No blocking pop. We use an external event loop mechanism such as
/// eventfd to notify our waiter that there is no data available so
/// we don't need to implement a blocking pop.
/// - Drain function. Most queues usually pop one at a time. We have
/// a mechanism for draining since on every IO loop our TTY drains
/// the full queue so we can get rid of the overhead of a ton of
/// locks and bounds checking and do a one-time drain.
///
/// One key usage pattern is that our blocking queues are single producer
/// single consumer (SPSC). This should let us do some interesting optimizations
/// in the future. At the time of writing this, the blocking queue implementation
/// is purposely naive to build something quickly, but we should benchmark
/// and make this more optimized as necessary.
pub fn BlockingQueue(
comptime T: type,
comptime capacity: usize,
) type {
return struct {
const Self = @This();
// The type we use for queue size types. We can optimize this
// in the future to be the correct bit-size for our preallocated
// size for this queue.
pub const Size = u32;
// The bounds of this queue. We recast this to Size so we can do math.
const bounds: Size = @intCast(capacity);
/// Specifies the timeout for an operation.
pub const Timeout = union(enum) {
/// Fail instantly (non-blocking).
instant: void,
/// Run forever or until interrupted
forever: void,
/// Nanoseconds
ns: u64,
};
/// Our data. The values are undefined until they are written.
data: [bounds]T = undefined,
/// The next location to write (next empty loc) and next location
/// to read (next non-empty loc). The number of written elements.
write: Size = 0,
read: Size = 0,
len: Size = 0,
/// The big mutex that must be held to read/write.
mutex: std.Thread.Mutex = .{},
/// A CV for being notified when the queue is no longer full. This is
/// used for writing. Note we DON'T have a CV for waiting on the
/// queue not being EMPTY because we use external notifiers for that.
cond_not_full: std.Thread.Condition = .{},
not_full_waiters: usize = 0,
/// Allocate the blocking queue on the heap.
pub fn create(alloc: Allocator) !*Self {
const ptr = try alloc.create(Self);
errdefer alloc.destroy(ptr);
ptr.* = .{
.data = undefined,
.len = 0,
.write = 0,
.read = 0,
.mutex = .{},
.cond_not_full = .{},
.not_full_waiters = 0,
};
return ptr;
}
/// Free all the resources for this queue. This should only be
/// called once all producers and consumers have quit.
pub fn destroy(self: *Self, alloc: Allocator) void {
self.* = undefined;
alloc.destroy(self);
}
/// Push a value to the queue. This returns the total size of the
/// queue (unread items) after the push. A return value of zero
/// means that the push failed.
pub fn push(self: *Self, value: T, timeout: Timeout) Size {
self.mutex.lock();
defer self.mutex.unlock();
// The
if (self.full()) {
switch (timeout) {
// If we're not waiting, then we failed to write.
.instant => return 0,
.forever => {
self.not_full_waiters += 1;
defer self.not_full_waiters -= 1;
self.cond_not_full.wait(&self.mutex);
},
.ns => |ns| {
self.not_full_waiters += 1;
defer self.not_full_waiters -= 1;
self.cond_not_full.timedWait(&self.mutex, ns) catch return 0;
},
}
// If we're still full, then we failed to write. This can
// happen in situations where we are interrupted.
if (self.full()) return 0;
}
// Add our data and update our accounting
self.data[self.write] = value;
self.write += 1;
if (self.write >= bounds) self.write -= bounds;
self.len += 1;
return self.len;
}
/// Pop a value from the queue without blocking.
pub fn pop(self: *Self) ?T {
self.mutex.lock();
defer self.mutex.unlock();
// If we're empty we have nothing
if (self.len == 0) return null;
// Get the index we're going to read data from and do some
// accounting. We don't copy the value here to avoid copying twice.
const n = self.read;
self.read += 1;
if (self.read >= bounds) self.read -= bounds;
self.len -= 1;
// If we have consumers waiting on a full queue, notify.
if (self.not_full_waiters > 0) self.cond_not_full.signal();
return self.data[n];
}
/// Pop all values from the queue. This will hold the big mutex
/// until `deinit` is called on the return value. This is used if
/// you know you're going to "pop" and utilize all the values
/// quickly to avoid many locks, bounds checks, and cv signals.
pub fn drain(self: *Self) DrainIterator {
self.mutex.lock();
return .{ .queue = self };
}
pub const DrainIterator = struct {
queue: *Self,
pub fn next(self: *DrainIterator) ?T {
if (self.queue.len == 0) return null;
// Read and account
const n = self.queue.read;
self.queue.read += 1;
if (self.queue.read >= bounds) self.queue.read -= bounds;
self.queue.len -= 1;
return self.queue.data[n];
}
pub fn deinit(self: *DrainIterator) void {
// If we have consumers waiting on a full queue, notify.
if (self.queue.not_full_waiters > 0) self.queue.cond_not_full.signal();
// Unlock
self.queue.mutex.unlock();
}
};
/// Returns true if the queue is full. This is not public because
/// it requires the lock to be held.
inline fn full(self: *Self) bool {
return self.len == bounds;
}
};
}
test "basic push and pop" {
const testing = std.testing;
const alloc = testing.allocator;
const Q = BlockingQueue(u64, 4);
const q = try Q.create(alloc);
defer q.destroy(alloc);
// Should have no values
try testing.expect(q.pop() == null);
// Push until we're full
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 2), q.push(2, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 3), q.push(3, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 4), q.push(4, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 0), q.push(5, .{ .instant = {} }));
// Pop!
try testing.expect(q.pop().? == 1);
try testing.expect(q.pop().? == 2);
try testing.expect(q.pop().? == 3);
try testing.expect(q.pop().? == 4);
try testing.expect(q.pop() == null);
// Drain does nothing
var it = q.drain();
try testing.expect(it.next() == null);
it.deinit();
// Verify we can still push
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
}
test "timed push" {
const testing = std.testing;
const alloc = testing.allocator;
const Q = BlockingQueue(u64, 1);
const q = try Q.create(alloc);
defer q.destroy(alloc);
// Push
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .instant = {} }));
// Timed push should fail
try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .ns = 1000 }));
}

View File

@@ -0,0 +1,181 @@
const fastmem = @import("../fastmem.zig");
const std = @import("std");
const assert = std.debug.assert;
/// An associative data structure used for efficiently storing and
/// retrieving values which are able to be recomputed if necessary.
///
/// This structure is effectively a hash table with fixed-sized buckets.
///
/// When inserting an item in to a full bucket, the least recently used
/// item is replaced.
///
/// To achieve this, when an item is accessed, it's moved to the end of
/// the bucket, and the rest of the items are moved over to fill the gap.
///
/// This should provide very good query performance and keep frequently
/// accessed items cached indefinitely.
///
/// Parameters:
///
/// `Context`
/// A type containing methods to define CacheTable behaviors.
/// - `fn hash(*Context, K) u64` - Return a hash for a key.
/// - `fn eql(*Context, K, K) bool` - Check two keys for equality.
///
/// - `fn evicted(*Context, K, V) void` - [OPTIONAL] Eviction callback.
/// If present, called whenever an item is evicted from the cache.
///
/// `bucket_count`
/// Should ideally be close to the median number of important items that
/// you expect to be cached at any given point. This is required to be a
/// power of 2 since performance suffers if it's not and there's no good
/// reason to allow it to be anything else.
///
/// `bucket_size`
/// should be larger if you expect a large number of unimportant items to
/// enter the cache at a time. Having larger buckets will avoid important
/// items being dropped from the cache prematurely.
///
pub fn CacheTable(
comptime K: type,
comptime V: type,
comptime Context: type,
comptime bucket_count: usize,
comptime bucket_size: u8,
) type {
return struct {
const Self = @This();
const KV = struct {
key: K,
value: V,
};
comptime {
assert(std.math.isPowerOfTwo(bucket_count));
assert(bucket_count <= std.math.maxInt(usize));
}
/// `bucket_count` buckets containing `bucket_size` KV pairs each.
///
/// We don't need to initialize this memory because we don't use it
/// unless it's within a bucket's stored length, which will guarantee
/// that we put actual items there.
buckets: [bucket_count][bucket_size]KV = undefined,
/// We use this array to keep track of how many slots in each bucket
/// have actual items in them. Once all the buckets fill up this will
/// become a pointless check, but hopefully branch prediction picks
/// up on it at that point. The memory cost isn't too bad since it's
/// just bytes, so should be a fraction the size of the main table.
lengths: [bucket_count]u8 = [_]u8{0} ** bucket_count,
/// An instance of the context structure.
/// Must be initialized before calling any operations.
context: Context,
/// Adds an item to the cache table. If an old value was removed to
/// make room then it is returned in a struct with its key and value.
pub fn put(self: *Self, key: K, value: V) ?KV {
const kv: KV = .{ .key = key, .value = value };
const idx: usize = @intCast(self.context.hash(key) % bucket_count);
// If we have space available in the bucket then we just append
if (self.lengths[idx] < bucket_size) {
self.buckets[idx][self.lengths[idx]] = kv;
self.lengths[idx] += 1;
return null;
}
assert(self.lengths[idx] == bucket_size);
// Append our new item and return the oldest
const evicted = fastmem.rotateIn(KV, &self.buckets[idx], kv);
// The Context is allowed to register an eviction hook.
if (comptime @hasDecl(Context, "evicted")) self.context.evicted(
evicted.key,
evicted.value,
);
return evicted;
}
/// Retrieves an item from the cache table.
///
/// Returns null if no item is found with the provided key.
pub fn get(self: *Self, key: K) ?V {
const idx: usize = @intCast(self.context.hash(key) % bucket_count);
const len = self.lengths[idx];
var i: usize = len;
while (i > 0) {
i -= 1;
if (self.context.eql(key, self.buckets[idx][i].key)) {
defer fastmem.rotateOnce(KV, self.buckets[idx][i..len]);
return self.buckets[idx][i].value;
}
}
return null;
}
/// Removes all items from the cache table.
///
/// If your `Context` has an `evicted` method,
/// it will be called with all removed items.
pub fn clear(self: *Self) void {
if (comptime @hasDecl(Context, "evicted")) {
for (self.buckets, self.lengths) |b, l| {
for (b[0..l]) |kv| {
self.context.evicted(kv.key, kv.value);
}
}
}
@memset(&self.lengths, 0);
}
};
}
/// Creates a Context automatically for the given key type. This uses the
/// same logic as std.hash_map.AutoContext today since the API matches.
fn AutoContext(comptime K: type) type {
return std.hash_map.AutoContext(K);
}
test CacheTable {
const testing = std.testing;
// Construct a table that purposely has a predictable hash so we can
// test all edge cases.
const T = CacheTable(u32, u32, struct {
pub fn hash(self: *const @This(), key: u32) u64 {
_ = self;
return @intCast(key);
}
pub fn eql(self: *const @This(), a: u32, b: u32) bool {
_ = self;
return a == b;
}
}, 2, 2);
var t: T = .{ .context = .{} };
// Fill the table
try testing.expect(t.put(0, 0) == null);
try testing.expect(t.put(1, 0) == null);
try testing.expect(t.put(2, 0) == null);
try testing.expect(t.put(3, 0) == null);
// It should now be full, so any insert should evict the oldest item.
// NOTE: For the sake of this test, we're assuming that the first item
// is evicted but we don't need to promise this.
try testing.expectEqual(T.KV{
.key = 0,
.value = 0,
}, t.put(4, 0).?);
// The first item should now be gone
try testing.expect(t.get(0) == null);
}

615
src/datastruct/circ_buf.zig Normal file
View File

@@ -0,0 +1,615 @@
const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const fastmem = @import("../fastmem.zig");
/// Returns a circular buffer containing type T.
pub fn CircBuf(comptime T: type, comptime default: T) type {
return struct {
const Self = @This();
// Implementation note: there's a lot of unsafe addition of usize
// here in this implementation that can technically overflow. If someone
// wants to fix this and make it overflow safe (use subtractions for
// checks prior to additions) then I welcome it. In reality, we'd
// have to be a really, really large terminal screen to even worry
// about this so I'm punting it.
storage: []T,
head: usize,
tail: usize,
// We could remove this and just use math with head/tail to figure
// it out, but our usage of circular buffers stores so much data that
// this minor overhead is not worth optimizing out.
full: bool,
pub const Iterator = struct {
buf: Self,
idx: usize,
direction: Direction,
pub const Direction = enum { forward, reverse };
pub fn next(self: *Iterator) ?*T {
if (self.idx >= self.buf.len()) return null;
// Get our index from the tail
const tail_idx = switch (self.direction) {
.forward => self.idx,
.reverse => self.buf.len() - self.idx - 1,
};
// Translate the tail index to a storage index
const storage_idx = (self.buf.tail + tail_idx) % self.buf.capacity();
self.idx += 1;
return &self.buf.storage[storage_idx];
}
};
/// Initialize a new circular buffer that can store size elements.
pub fn init(alloc: Allocator, size: usize) !Self {
const buf = try alloc.alloc(T, size);
@memset(buf, default);
return Self{
.storage = buf,
.head = 0,
.tail = 0,
.full = false,
};
}
pub fn deinit(self: *Self, alloc: Allocator) void {
alloc.free(self.storage);
self.* = undefined;
}
/// Append a single value to the buffer. If the buffer is full,
/// an error will be returned.
pub fn append(self: *Self, v: T) !void {
if (self.full) return error.OutOfMemory;
self.storage[self.head] = v;
self.head += 1;
if (self.head >= self.storage.len) self.head = 0;
self.full = self.head == self.tail;
}
/// Clear the buffer.
pub fn clear(self: *Self) void {
self.head = 0;
self.tail = 0;
self.full = false;
}
/// Iterate over the circular buffer.
pub fn iterator(self: Self, direction: Iterator.Direction) Iterator {
return Iterator{
.buf = self,
.idx = 0,
.direction = direction,
};
}
/// Resize the buffer to the given size (larger or smaller).
/// If larger, new values will be set to the default value.
pub fn resize(self: *Self, alloc: Allocator, size: usize) Allocator.Error!void {
// Rotate to zero so it is aligned.
try self.rotateToZero(alloc);
// Reallocate, this adds to the end so we're ready to go.
const prev_len = self.len();
const prev_cap = self.storage.len;
self.storage = try alloc.realloc(self.storage, size);
// If we grew, we need to set our new defaults. We can add it
// at the end since we rotated to start.
if (size > prev_cap) {
@memset(self.storage[prev_cap..], default);
// Fix up our head/tail
if (self.full) {
self.head = prev_len;
self.full = false;
}
}
}
/// Rotate the data so that it is zero-aligned.
fn rotateToZero(self: *Self, alloc: Allocator) Allocator.Error!void {
// TODO: this does this in the worst possible way by allocating.
// rewrite to not allocate, its possible, I'm just lazy right now.
// If we're already at zero then do nothing.
if (self.tail == 0) return;
var buf = try alloc.alloc(T, self.storage.len);
defer {
self.head = if (self.full) 0 else self.len();
self.tail = 0;
alloc.free(self.storage);
self.storage = buf;
}
if (!self.full and self.head >= self.tail) {
fastmem.copy(T, buf, self.storage[self.tail..self.head]);
return;
}
const middle = self.storage.len - self.tail;
fastmem.copy(T, buf, self.storage[self.tail..]);
fastmem.copy(T, buf[middle..], self.storage[0..self.head]);
}
/// Returns if the buffer is currently empty. To check if its
/// full, just check the "full" attribute.
pub fn empty(self: Self) bool {
return !self.full and self.head == self.tail;
}
/// Returns the total capacity allocated for this buffer.
pub fn capacity(self: Self) usize {
return self.storage.len;
}
/// Returns the length in elements that are used.
pub fn len(self: Self) usize {
if (self.full) return self.storage.len;
if (self.head >= self.tail) return self.head - self.tail;
return self.storage.len - (self.tail - self.head);
}
/// Delete the oldest n values from the buffer. If there are less
/// than n values in the buffer, it'll delete everything.
pub fn deleteOldest(self: *Self, n: usize) void {
assert(n <= self.storage.len);
// Clear the values back to default
const slices = self.getPtrSlice(0, n);
inline for (slices) |slice| @memset(slice, default);
// If we're not full, we can just advance the tail. We know
// it'll be less than the length because otherwise we'd be full.
self.tail += @min(self.len(), n);
if (self.tail >= self.storage.len) self.tail -= self.storage.len;
self.full = false;
}
/// Returns a pointer to the value at offset with the given length,
/// and considers this full amount of data "written" if it is beyond
/// the end of our buffer. This never "rotates" the buffer because
/// the offset can only be within the size of the buffer.
pub fn getPtrSlice(self: *Self, offset: usize, slice_len: usize) [2][]T {
// Note: this assertion is very important, it hints the compiler
// which generates ~10% faster code than without it.
assert(offset + slice_len <= self.capacity());
// End offset is the last offset (exclusive) for our slice.
// We use exclusive because it makes the math easier and it
// matches Zigs slicing parameterization.
const end_offset = offset + slice_len;
// If our slice can't fit it in our length, then we need to advance.
if (end_offset > self.len()) self.advance(end_offset - self.len());
// Our start and end indexes into the storage buffer
const start_idx = self.storageOffset(offset);
const end_idx = self.storageOffset(end_offset - 1);
// std.log.warn("A={} B={}", .{ start_idx, end_idx });
// Optimistically, our data fits in one slice
if (end_idx >= start_idx) {
return .{
self.storage[start_idx .. end_idx + 1],
self.storage[0..0], // So there is an empty slice
};
}
return .{
self.storage[start_idx..],
self.storage[0 .. end_idx + 1],
};
}
/// Advances the head/tail so that we can store amount.
fn advance(self: *Self, amount: usize) void {
assert(amount <= self.storage.len - self.len());
// Optimistically add our amount
self.head += amount;
// If we exceeded the length of the buffer, wrap around.
if (self.head >= self.storage.len) self.head = self.head - self.storage.len;
// If we're full, we have to keep tail lined up.
if (self.full) self.tail = self.head;
// We're full if the head reached the tail. The head can never
// pass the tail because advance asserts amount is only in
// available space left
self.full = self.head == self.tail;
}
/// For a given offset from zero, this returns the offset in the
/// storage buffer where this data can be found.
fn storageOffset(self: Self, offset: usize) usize {
assert(offset < self.storage.len);
// This should be subtraction ideally to avoid overflows but
// it would take a really, really, huge buffer to overflow.
const fits_offset = self.tail + offset;
if (fits_offset < self.storage.len) return fits_offset;
return fits_offset - self.storage.len;
}
};
}
test {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 12);
defer buf.deinit(alloc);
try testing.expect(buf.empty());
try testing.expectEqual(@as(usize, 0), buf.len());
}
test "append" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 3);
defer buf.deinit(alloc);
try buf.append(1);
try buf.append(2);
try buf.append(3);
try testing.expectError(error.OutOfMemory, buf.append(4));
buf.deleteOldest(1);
try buf.append(4);
try testing.expectError(error.OutOfMemory, buf.append(5));
}
test "forward iterator" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 3);
defer buf.deinit(alloc);
// Empty
{
var it = buf.iterator(.forward);
try testing.expect(it.next() == null);
}
// Partially full
try buf.append(1);
try buf.append(2);
{
var it = buf.iterator(.forward);
try testing.expect(it.next().?.* == 1);
try testing.expect(it.next().?.* == 2);
try testing.expect(it.next() == null);
}
// Full
try buf.append(3);
{
var it = buf.iterator(.forward);
try testing.expect(it.next().?.* == 1);
try testing.expect(it.next().?.* == 2);
try testing.expect(it.next().?.* == 3);
try testing.expect(it.next() == null);
}
// Delete and add
buf.deleteOldest(1);
try buf.append(4);
{
var it = buf.iterator(.forward);
try testing.expect(it.next().?.* == 2);
try testing.expect(it.next().?.* == 3);
try testing.expect(it.next().?.* == 4);
try testing.expect(it.next() == null);
}
}
test "reverse iterator" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 3);
defer buf.deinit(alloc);
// Empty
{
var it = buf.iterator(.reverse);
try testing.expect(it.next() == null);
}
// Partially full
try buf.append(1);
try buf.append(2);
{
var it = buf.iterator(.reverse);
try testing.expect(it.next().?.* == 2);
try testing.expect(it.next().?.* == 1);
try testing.expect(it.next() == null);
}
// Full
try buf.append(3);
{
var it = buf.iterator(.reverse);
try testing.expect(it.next().?.* == 3);
try testing.expect(it.next().?.* == 2);
try testing.expect(it.next().?.* == 1);
try testing.expect(it.next() == null);
}
// Delete and add
buf.deleteOldest(1);
try buf.append(4);
{
var it = buf.iterator(.reverse);
try testing.expect(it.next().?.* == 4);
try testing.expect(it.next().?.* == 3);
try testing.expect(it.next().?.* == 2);
try testing.expect(it.next() == null);
}
}
test "getPtrSlice fits" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 12);
defer buf.deinit(alloc);
const slices = buf.getPtrSlice(0, 11);
try testing.expectEqual(@as(usize, 11), slices[0].len);
try testing.expectEqual(@as(usize, 0), slices[1].len);
try testing.expectEqual(@as(usize, 11), buf.len());
}
test "getPtrSlice wraps" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 4);
defer buf.deinit(alloc);
// Fill the buffer
_ = buf.getPtrSlice(0, buf.capacity());
try testing.expect(buf.full);
try testing.expectEqual(@as(usize, 4), buf.len());
// Delete
buf.deleteOldest(2);
try testing.expect(!buf.full);
try testing.expectEqual(@as(usize, 2), buf.len());
// Get a slice that doesn't grow
{
const slices = buf.getPtrSlice(0, 2);
try testing.expectEqual(@as(usize, 2), slices[0].len);
try testing.expectEqual(@as(usize, 0), slices[1].len);
try testing.expectEqual(@as(usize, 2), buf.len());
slices[0][0] = 1;
slices[0][1] = 2;
}
// Get a slice that does grow, and forces wrap
{
const slices = buf.getPtrSlice(2, 2);
try testing.expectEqual(@as(usize, 2), slices[0].len);
try testing.expectEqual(@as(usize, 0), slices[1].len);
try testing.expectEqual(@as(usize, 4), buf.len());
// should be empty
try testing.expectEqual(@as(u8, 0), slices[0][0]);
try testing.expectEqual(@as(u8, 0), slices[0][1]);
slices[0][0] = 3;
slices[0][1] = 4;
}
// Get a slice across boundaries
{
const slices = buf.getPtrSlice(0, 4);
try testing.expectEqual(@as(usize, 2), slices[0].len);
try testing.expectEqual(@as(usize, 2), slices[1].len);
try testing.expectEqual(@as(usize, 4), buf.len());
try testing.expectEqual(@as(u8, 1), slices[0][0]);
try testing.expectEqual(@as(u8, 2), slices[0][1]);
try testing.expectEqual(@as(u8, 3), slices[1][0]);
try testing.expectEqual(@as(u8, 4), slices[1][1]);
}
}
test "rotateToZero" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 12);
defer buf.deinit(alloc);
_ = buf.getPtrSlice(0, 11);
try buf.rotateToZero(alloc);
}
test "rotateToZero offset" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 4);
defer buf.deinit(alloc);
// Fill the buffer
_ = buf.getPtrSlice(0, 3);
try testing.expectEqual(@as(usize, 3), buf.len());
// Delete
buf.deleteOldest(2);
try testing.expect(!buf.full);
try testing.expectEqual(@as(usize, 1), buf.len());
try testing.expect(buf.tail > 0 and buf.head >= buf.tail);
// Rotate to zero
try buf.rotateToZero(alloc);
try testing.expectEqual(@as(usize, 0), buf.tail);
try testing.expectEqual(@as(usize, 1), buf.head);
}
test "rotateToZero wraps" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 4);
defer buf.deinit(alloc);
// Fill the buffer
_ = buf.getPtrSlice(0, 3);
try testing.expectEqual(@as(usize, 3), buf.len());
try testing.expect(buf.tail == 0 and buf.head == 3);
// Delete all
buf.deleteOldest(3);
try testing.expectEqual(@as(usize, 0), buf.len());
try testing.expect(buf.tail == 3 and buf.head == 3);
// Refill to force a wrap
{
const slices = buf.getPtrSlice(0, 3);
slices[0][0] = 1;
slices[1][0] = 2;
slices[1][1] = 3;
try testing.expectEqual(@as(usize, 3), buf.len());
try testing.expect(buf.tail == 3 and buf.head == 2);
}
// Rotate to zero
try buf.rotateToZero(alloc);
try testing.expectEqual(@as(usize, 0), buf.tail);
try testing.expectEqual(@as(usize, 3), buf.head);
{
const slices = buf.getPtrSlice(0, 3);
try testing.expectEqual(@as(u8, 1), slices[0][0]);
try testing.expectEqual(@as(u8, 2), slices[0][1]);
try testing.expectEqual(@as(u8, 3), slices[0][2]);
}
}
test "rotateToZero full no wrap" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 4);
defer buf.deinit(alloc);
// Fill the buffer
_ = buf.getPtrSlice(0, 3);
// Delete all
buf.deleteOldest(3);
// Refill to force a wrap
{
const slices = buf.getPtrSlice(0, 4);
try testing.expect(buf.full);
slices[0][0] = 1;
slices[1][0] = 2;
slices[1][1] = 3;
slices[1][2] = 4;
}
// Rotate to zero
try buf.rotateToZero(alloc);
try testing.expect(buf.full);
try testing.expectEqual(@as(usize, 0), buf.tail);
try testing.expectEqual(@as(usize, 0), buf.head);
{
const slices = buf.getPtrSlice(0, 4);
try testing.expectEqual(@as(u8, 1), slices[0][0]);
try testing.expectEqual(@as(u8, 2), slices[0][1]);
try testing.expectEqual(@as(u8, 3), slices[0][2]);
try testing.expectEqual(@as(u8, 4), slices[0][3]);
}
}
test "resize grow" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 4);
defer buf.deinit(alloc);
// Fill and write
{
const slices = buf.getPtrSlice(0, 4);
try testing.expect(buf.full);
slices[0][0] = 1;
slices[0][1] = 2;
slices[0][2] = 3;
slices[0][3] = 4;
}
// Resize
try buf.resize(alloc, 6);
try testing.expect(!buf.full);
try testing.expectEqual(@as(usize, 4), buf.len());
try testing.expectEqual(@as(usize, 6), buf.capacity());
{
const slices = buf.getPtrSlice(0, 4);
try testing.expectEqual(@as(u8, 1), slices[0][0]);
try testing.expectEqual(@as(u8, 2), slices[0][1]);
try testing.expectEqual(@as(u8, 3), slices[0][2]);
try testing.expectEqual(@as(u8, 4), slices[0][3]);
}
}
test "resize shrink" {
const testing = std.testing;
const alloc = testing.allocator;
const Buf = CircBuf(u8, 0);
var buf = try Buf.init(alloc, 4);
defer buf.deinit(alloc);
// Fill and write
{
const slices = buf.getPtrSlice(0, 4);
try testing.expect(buf.full);
slices[0][0] = 1;
slices[0][1] = 2;
slices[0][2] = 3;
slices[0][3] = 4;
}
// Resize
try buf.resize(alloc, 3);
try testing.expect(buf.full);
try testing.expectEqual(@as(usize, 3), buf.len());
try testing.expectEqual(@as(usize, 3), buf.capacity());
{
const slices = buf.getPtrSlice(0, 3);
try testing.expectEqual(@as(u8, 1), slices[0][0]);
try testing.expectEqual(@as(u8, 2), slices[0][1]);
try testing.expectEqual(@as(u8, 3), slices[0][2]);
}
}

337
src/datastruct/lru.zig Normal file
View File

@@ -0,0 +1,337 @@
const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
/// Create a HashMap for a key type that can be automatically hashed.
/// If you want finer-grained control, use HashMap directly.
pub fn AutoHashMap(comptime K: type, comptime V: type) type {
return HashMap(
K,
V,
std.hash_map.AutoContext(K),
std.hash_map.default_max_load_percentage,
);
}
/// HashMap implementation that supports least-recently-used eviction.
///
/// Beware of the Zig bug where a hashmap gets slower over time
/// (https://github.com/ziglang/zig/issues/17851). This LRU uses a hashmap
/// and evictions will cause this issue to appear. Callers should keep
/// track of eviction counts and periodically reinitialize the LRU to
/// avoid this issue. The LRU itself can't do this because it doesn't
/// know how to free values.
///
/// Note: This is a really elementary CS101 version of an LRU right now.
/// This is done initially to get something working. Once we have it working,
/// we can benchmark and improve if this ends up being a source of slowness.
pub fn HashMap(
comptime K: type,
comptime V: type,
comptime Context: type,
comptime max_load_percentage: u64,
) type {
return struct {
const Self = @This();
const Map = std.HashMapUnmanaged(K, *Queue.Node, Context, max_load_percentage);
const Queue = std.TailQueue(KV);
/// Map to maintain our entries.
map: Map,
/// Queue to maintain LRU order.
queue: Queue,
/// The capacity of our map. If this capacity is reached, cache
/// misses will begin evicting entries.
capacity: Map.Size,
pub const KV = struct {
key: K,
value: V,
};
/// The result of a getOrPut operation.
pub const GetOrPutResult = struct {
/// The entry that was retrieved. If found_existing is false,
/// then this is a pointer to allocated space to store a V.
/// If found_existing is true, the pointer value is valid, but
/// can be overwritten.
value_ptr: *V,
/// Whether an existing value was found or not.
found_existing: bool,
/// If another entry had to be evicted to make space for this
/// put operation, then this is the value that was evicted.
evicted: ?KV,
};
pub fn init(capacity: Map.Size) Self {
return .{
.map = .{},
.queue = .{},
.capacity = capacity,
};
}
pub fn deinit(self: *Self, alloc: Allocator) void {
// Important: use our queue as a source of truth for dealloc
// because we might keep items in the queue around that aren't
// present in our LRU anymore to prevent future allocations.
var it = self.queue.first;
while (it) |node| {
it = node.next;
alloc.destroy(node);
}
self.map.deinit(alloc);
self.* = undefined;
}
/// Get or put a value for a key. See GetOrPutResult on how to check
/// if an existing value was found, if an existing value was evicted,
/// etc.
pub fn getOrPut(self: *Self, allocator: Allocator, key: K) Allocator.Error!GetOrPutResult {
if (@sizeOf(Context) != 0)
@compileError("Cannot infer context " ++ @typeName(Context) ++ ", call getOrPutContext instead.");
return self.getOrPutContext(allocator, key, undefined);
}
/// See getOrPut
pub fn getOrPutContext(
self: *Self,
alloc: Allocator,
key: K,
ctx: Context,
) Allocator.Error!GetOrPutResult {
const map_gop = try self.map.getOrPutContext(alloc, key, ctx);
if (map_gop.found_existing) {
// Move to end to mark as most recently used
self.queue.remove(map_gop.value_ptr.*);
self.queue.append(map_gop.value_ptr.*);
return GetOrPutResult{
.found_existing = true,
.value_ptr = &map_gop.value_ptr.*.data.value,
.evicted = null,
};
}
errdefer _ = self.map.remove(key);
// We're evicting if our map insertion increased our capacity.
const evict = self.map.count() > self.capacity;
// Get our node. If we're not evicting then we allocate a new
// node. If we are evicting then we avoid allocation by just
// reusing the node we would've evicted.
var node = if (!evict) try alloc.create(Queue.Node) else node: {
// Our first node is the least recently used.
const least_used = self.queue.first.?;
// Move our least recently used to the end to make
// it the most recently used.
self.queue.remove(least_used);
// Remove the least used from the map
_ = self.map.remove(least_used.data.key);
break :node least_used;
};
errdefer if (!evict) alloc.destroy(node);
// Store our node in the map.
map_gop.value_ptr.* = node;
// Mark the node as most recently used
self.queue.append(node);
// Set our key
node.data.key = key;
return GetOrPutResult{
.found_existing = map_gop.found_existing,
.value_ptr = &node.data.value,
.evicted = if (!evict) null else node.data,
};
}
/// Get a value for a key.
pub fn get(self: *const Self, key: K) ?V {
if (@sizeOf(Context) != 0) {
@compileError("getContext must be used.");
}
return self.getContext(key, undefined);
}
/// See get
pub fn getContext(self: *const Self, key: K, ctx: Context) ?V {
const node = self.map.getContext(key, ctx) orelse return null;
return node.data.value;
}
/// Resize the LRU. If this shrinks the LRU then LRU items will be
/// deallocated. The deallocated items are returned in the slice. This
/// slice must be freed by the caller.
pub fn resize(self: *Self, alloc: Allocator, capacity: Map.Size) Allocator.Error!?[]V {
// Fastest
if (capacity >= self.capacity) {
self.capacity = capacity;
return null;
}
// If we're shrinking but we're smaller than the new capacity,
// then we don't have to do anything.
if (self.map.count() <= capacity) {
self.capacity = capacity;
return null;
}
// We're shrinking and we have more items than the new capacity
const delta = self.map.count() - capacity;
var evicted = try alloc.alloc(V, delta);
var i: Map.Size = 0;
while (i < delta) : (i += 1) {
const node = self.queue.first.?;
evicted[i] = node.data.value;
self.queue.remove(node);
_ = self.map.remove(node.data.key);
alloc.destroy(node);
}
self.capacity = capacity;
assert(self.map.count() == capacity);
return evicted;
}
};
}
test "getOrPut" {
const testing = std.testing;
const alloc = testing.allocator;
const Map = AutoHashMap(u32, u8);
var m = Map.init(2);
defer m.deinit(alloc);
// Insert cap values, should be hits
{
const gop = try m.getOrPut(alloc, 1);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted == null);
gop.value_ptr.* = 1;
}
{
const gop = try m.getOrPut(alloc, 2);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted == null);
gop.value_ptr.* = 2;
}
// 1 is LRU
try testing.expect((try m.getOrPut(alloc, 1)).found_existing);
try testing.expect((try m.getOrPut(alloc, 2)).found_existing);
// Next should evict
{
const gop = try m.getOrPut(alloc, 3);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted != null);
try testing.expect(gop.evicted.?.value == 1);
gop.value_ptr.* = 3;
}
// Currently: 2 is LRU, let's make 3 LRU
try testing.expect((try m.getOrPut(alloc, 2)).found_existing);
// Next should evict
{
const gop = try m.getOrPut(alloc, 4);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted != null);
try testing.expect(gop.evicted.?.value == 3);
gop.value_ptr.* = 4;
}
}
test "get" {
const testing = std.testing;
const alloc = testing.allocator;
const Map = AutoHashMap(u32, u8);
var m = Map.init(2);
defer m.deinit(alloc);
// Insert cap values, should be hits
{
const gop = try m.getOrPut(alloc, 1);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted == null);
gop.value_ptr.* = 1;
}
try testing.expect(m.get(1) != null);
try testing.expect(m.get(1).? == 1);
try testing.expect(m.get(2) == null);
}
test "resize shrink without removal" {
const testing = std.testing;
const alloc = testing.allocator;
const Map = AutoHashMap(u32, u8);
var m = Map.init(2);
defer m.deinit(alloc);
// Insert cap values, LRU is 1
{
const gop = try m.getOrPut(alloc, 1);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted == null);
gop.value_ptr.* = 1;
}
// Shrink
const evicted = try m.resize(alloc, 1);
try testing.expect(evicted == null);
{
const gop = try m.getOrPut(alloc, 1);
try testing.expect(gop.found_existing);
}
}
test "resize shrink and remove" {
const testing = std.testing;
const alloc = testing.allocator;
const Map = AutoHashMap(u32, u8);
var m = Map.init(2);
defer m.deinit(alloc);
// Insert cap values, LRU is 1
{
const gop = try m.getOrPut(alloc, 1);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted == null);
gop.value_ptr.* = 1;
}
{
const gop = try m.getOrPut(alloc, 2);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted == null);
gop.value_ptr.* = 2;
}
// Shrink
const evicted = try m.resize(alloc, 1);
defer alloc.free(evicted.?);
try testing.expectEqual(@as(usize, 1), evicted.?.len);
{
const gop = try m.getOrPut(alloc, 1);
try testing.expect(!gop.found_existing);
try testing.expect(gop.evicted.?.value == 2);
gop.value_ptr.* = 1;
}
}

17
src/datastruct/main.zig Normal file
View File

@@ -0,0 +1,17 @@
//! The datastruct package contains data structures or anything closely
//! related to data structures.
const blocking_queue = @import("blocking_queue.zig");
const cache_table = @import("cache_table.zig");
const circ_buf = @import("circ_buf.zig");
const segmented_pool = @import("segmented_pool.zig");
pub const lru = @import("lru.zig");
pub const BlockingQueue = blocking_queue.BlockingQueue;
pub const CacheTable = cache_table.CacheTable;
pub const CircBuf = circ_buf.CircBuf;
pub const SegmentedPool = segmented_pool.SegmentedPool;
test {
@import("std").testing.refAllDecls(@This());
}

View File

@@ -0,0 +1,95 @@
const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const testing = std.testing;
/// A data structure where you can get stable (never copied) pointers to
/// a type that automatically grows if necessary. The values can be "put back"
/// but are expected to be put back IN ORDER.
///
/// This is implemented specifically for libuv write requests, since the
/// write requests must have a stable pointer and are guaranteed to be processed
/// in order for a single stream.
///
/// This is NOT thread safe.
pub fn SegmentedPool(comptime T: type, comptime prealloc: usize) type {
return struct {
const Self = @This();
i: usize = 0,
available: usize = prealloc,
list: std.SegmentedList(T, prealloc) = .{ .len = prealloc },
pub fn deinit(self: *Self, alloc: Allocator) void {
self.list.deinit(alloc);
self.* = undefined;
}
/// Get the next available value out of the list. This will not
/// grow the list.
pub fn get(self: *Self) !*T {
// Error to not have any
if (self.available == 0) return error.OutOfValues;
// The index we grab is just i % len, so we wrap around to the front.
const i = @mod(self.i, self.list.len);
self.i +%= 1; // Wrapping addition to swe go back to 0
self.available -= 1;
return self.list.at(i);
}
/// Get the next available value out of the list and grow the list
/// if necessary.
pub fn getGrow(self: *Self, alloc: Allocator) !*T {
if (self.available == 0) try self.grow(alloc);
return try self.get();
}
fn grow(self: *Self, alloc: Allocator) !void {
try self.list.growCapacity(alloc, self.list.len * 2);
self.i = self.list.len;
self.available = self.list.len;
self.list.len *= 2;
}
/// Put a value back. The value put back is expected to be the
/// in order of get.
pub fn put(self: *Self) void {
self.available += 1;
assert(self.available <= self.list.len);
}
};
}
test "SegmentedPool" {
var list: SegmentedPool(u8, 2) = .{};
defer list.deinit(testing.allocator);
try testing.expectEqual(@as(usize, 2), list.available);
// Get to capacity
const v1 = try list.get();
const v2 = try list.get();
try testing.expect(v1 != v2);
try testing.expectError(error.OutOfValues, list.get());
// Test writing for later
v1.* = 42;
// Put a value back
list.put();
const temp = try list.get();
try testing.expect(v1 == temp);
try testing.expect(temp.* == 42);
try testing.expectError(error.OutOfValues, list.get());
// Grow
const v3 = try list.getGrow(testing.allocator);
try testing.expect(v1 != v3 and v2 != v3);
_ = try list.get();
try testing.expectError(error.OutOfValues, list.get());
// Put a value back
list.put();
try testing.expect(v1 == try list.get());
try testing.expectError(error.OutOfValues, list.get());
}