Files
Odin/core/nbio/mpsc.odin
2026-01-17 21:03:25 +01:00

63 lines
1.7 KiB
Odin

#+private
package nbio
import "base:runtime"
import "core:sync"
Multi_Producer_Single_Consumer :: struct {
count: int,
head: int,
tail: int,
buffer: []rawptr,
mask: int,
}
mpsc_init :: proc(mpscq: ^Multi_Producer_Single_Consumer, cap: int, allocator: runtime.Allocator) -> runtime.Allocator_Error {
assert(runtime.is_power_of_two_int(cap), "cap must be a power of 2")
mpscq.buffer = make([]rawptr, cap, allocator) or_return
mpscq.mask = cap-1
sync.atomic_thread_fence(.Release)
return nil
}
mpsc_destroy :: proc(mpscq: ^Multi_Producer_Single_Consumer, allocator: runtime.Allocator) {
delete(mpscq.buffer, allocator)
}
mpsc_enqueue :: proc(mpscq: ^Multi_Producer_Single_Consumer, obj: rawptr) -> bool {
count := sync.atomic_add_explicit(&mpscq.count, 1, .Acquire)
if count >= len(mpscq.buffer) {
sync.atomic_sub_explicit(&mpscq.count, 1, .Release)
return false
}
head := sync.atomic_add_explicit(&mpscq.head, 1, .Acquire)
assert(mpscq.buffer[head & mpscq.mask] == nil)
rv := sync.atomic_exchange_explicit(&mpscq.buffer[head & mpscq.mask], obj, .Release)
assert(rv == nil)
return true
}
mpsc_dequeue :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> rawptr {
ret := sync.atomic_exchange_explicit(&mpscq.buffer[mpscq.tail], nil, .Acquire)
if ret == nil {
return nil
}
mpscq.tail += 1
if mpscq.tail >= len(mpscq.buffer) {
mpscq.tail = 0
}
r := sync.atomic_sub_explicit(&mpscq.count, 1, .Release)
assert(r > 0)
return ret
}
mpsc_count :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int {
return sync.atomic_load_explicit(&mpscq.count, .Relaxed)
}
mpsc_cap :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int {
return len(mpscq.buffer)
}