mirror of
https://github.com/odin-lang/Odin.git
synced 2026-02-12 22:33:36 +00:00
295 lines
9.8 KiB
Odin
295 lines
9.8 KiB
Odin
package uring
|
|
|
|
import "core:math"
|
|
import "core:sync"
|
|
import "core:sys/linux"
|
|
|
|
DEFAULT_THREAD_IDLE_MS :: 1000
|
|
DEFAULT_ENTRIES :: 32
|
|
MAX_ENTRIES :: 4096
|
|
|
|
Ring :: struct {
|
|
fd: linux.Fd,
|
|
sq: Submission_Queue,
|
|
cq: Completion_Queue,
|
|
flags: linux.IO_Uring_Setup_Flags,
|
|
features: linux.IO_Uring_Features,
|
|
}
|
|
|
|
DEFAULT_PARAMS :: linux.IO_Uring_Params {
|
|
sq_thread_idle = DEFAULT_THREAD_IDLE_MS,
|
|
}
|
|
|
|
// Initialize and setup an uring, `entries` must be a power of 2 between 1 and 4096.
|
|
init :: proc(ring: ^Ring, params: ^linux.IO_Uring_Params, entries: u32 = DEFAULT_ENTRIES) -> (err: linux.Errno) {
|
|
assert(entries <= MAX_ENTRIES, "too many entries")
|
|
assert(entries != 0, "entries must be positive")
|
|
assert(math.is_power_of_two(int(entries)), "entries must be a power of two")
|
|
|
|
fd := linux.io_uring_setup(entries, params) or_return
|
|
defer if err != nil { linux.close(fd) }
|
|
|
|
if .SINGLE_MMAP not_in params.features {
|
|
// NOTE: Could support this, but currently isn't.
|
|
err = .ENOSYS
|
|
return
|
|
}
|
|
|
|
assert(.CQE32 not_in params.flags, "unsupported flag") // NOTE: Could support this by making IO_Uring generic.
|
|
assert(.SQE128 not_in params.flags, "unsupported flag") // NOTE: Could support this by making IO_Uring generic.
|
|
|
|
sq := submission_queue_make(fd, params) or_return
|
|
|
|
ring.fd = fd
|
|
ring.sq = sq
|
|
ring.cq = completion_queue_make(fd, params, &sq)
|
|
ring.flags = params.flags
|
|
ring.features = params.features
|
|
|
|
return
|
|
}
|
|
|
|
destroy :: proc(ring: ^Ring) {
|
|
assert(ring.fd >= 0)
|
|
submission_queue_destroy(&ring.sq)
|
|
linux.close(ring.fd)
|
|
ring.fd = -1
|
|
}
|
|
|
|
// Returns a pointer to a vacant submission queue entry, or nil if the submission queue is full.
|
|
// NOTE: extra is so you can make sure there is space for related entries, defaults to 1 so
|
|
// a link timeout op can always be added after another.
|
|
get_sqe :: proc(ring: ^Ring, extra: int = 1) -> (sqe: ^linux.IO_Uring_SQE, ok: bool) {
|
|
sq := &ring.sq
|
|
head: u32 = sync.atomic_load_explicit(sq.head, .Acquire)
|
|
next := sq.sqe_tail + 1
|
|
|
|
if int(next - head) > len(sq.sqes)-extra {
|
|
sqe = nil
|
|
ok = false
|
|
return
|
|
}
|
|
|
|
sqe = &sq.sqes[sq.sqe_tail & sq.mask]
|
|
sqe^ = {}
|
|
|
|
sq.sqe_tail = next
|
|
ok = true
|
|
return
|
|
}
|
|
|
|
free_space :: proc(ring: ^Ring) -> int {
|
|
sq := &ring.sq
|
|
head := sync.atomic_load_explicit(sq.head, .Acquire)
|
|
next := sq.sqe_tail + 1
|
|
free := len(sq.sqes) - int(next - head)
|
|
assert(free >= 0)
|
|
return free
|
|
}
|
|
|
|
// Sync internal state with kernel ring state on the submission queue side.
|
|
// Returns the number of all pending events in the submission queue.
|
|
// Rationale is to determine that an enter call is needed.
|
|
flush_sq :: proc(ring: ^Ring) -> (n_pending: u32) {
|
|
sq := &ring.sq
|
|
to_submit := sq.sqe_tail - sq.sqe_head
|
|
if to_submit != 0 {
|
|
tail := sq.tail^
|
|
i: u32 = 0
|
|
for ; i < to_submit; i += 1 {
|
|
sq.array[tail & sq.mask] = sq.sqe_head & sq.mask
|
|
tail += 1
|
|
sq.sqe_head += 1
|
|
}
|
|
sync.atomic_store_explicit(sq.tail, tail, .Release)
|
|
}
|
|
n_pending = sq_ready(ring)
|
|
return
|
|
}
|
|
|
|
// Returns true if we are not using an SQ thread (thus nobody submits but us),
|
|
// or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened.
|
|
// For the latter case, we set the SQ thread wakeup flag.
|
|
// Matches the implementation of sq_ring_needs_enter() in liburing.
|
|
sq_ring_needs_enter :: proc(ring: ^Ring, flags: ^linux.IO_Uring_Enter_Flags) -> bool {
|
|
assert(flags^ == {})
|
|
if .SQPOLL not_in ring.flags { return true }
|
|
if .NEED_WAKEUP in sync.atomic_load_explicit(ring.sq.flags, .Relaxed) {
|
|
flags^ += {.SQ_WAKEUP}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
|
|
// Submits the submission queue entries acquired via get_sqe().
|
|
// Returns the number of entries submitted.
|
|
// Optionally wait for a number of events by setting `wait_nr`, and/or set a maximum wait time by setting `timeout`.
|
|
submit :: proc(ring: ^Ring, wait_nr: u32 = 0, timeout: ^linux.Time_Spec = nil) -> (n_submitted: u32, err: linux.Errno) {
|
|
n_submitted = flush_sq(ring)
|
|
flags: linux.IO_Uring_Enter_Flags
|
|
if sq_ring_needs_enter(ring, &flags) || wait_nr > 0 {
|
|
if wait_nr > 0 || .IOPOLL in ring.flags {
|
|
flags += {.GETEVENTS}
|
|
}
|
|
|
|
flags += {.EXT_ARG}
|
|
ext: linux.IO_Uring_Getevents_Arg
|
|
ext.ts = timeout
|
|
|
|
n_submitted_: int
|
|
n_submitted_, err = linux.io_uring_enter2(ring.fd, n_submitted, wait_nr, flags, &ext)
|
|
assert(n_submitted_ >= 0)
|
|
n_submitted = u32(n_submitted_)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Returns the number of submission queue entries in the submission queue.
|
|
sq_ready :: proc(ring: ^Ring) -> u32 {
|
|
// Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync,
|
|
// see https://github.com/axboe/liburing/issues/92.
|
|
return ring.sq.sqe_tail - sync.atomic_load_explicit(ring.sq.head, .Acquire)
|
|
}
|
|
|
|
// Returns the number of completion queue entries in the completion queue (yet to consume).
|
|
cq_ready :: proc(ring: ^Ring) -> (n_ready: u32) {
|
|
return sync.atomic_load_explicit(ring.cq.tail, .Acquire) - ring.cq.head^
|
|
}
|
|
|
|
// Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice.
|
|
// If none are available, enters into the kernel to wait for at most `wait_nr` CQEs.
|
|
// Returns the number of CQEs copied, advancing the CQ ring.
|
|
// Provides all the wait/peek methods found in liburing, but with batching and a single method.
|
|
// TODO: allow for timeout.
|
|
copy_cqes :: proc(ring: ^Ring, cqes: []linux.IO_Uring_CQE, wait_nr: u32) -> (n_copied: u32, err: linux.Errno) {
|
|
n_copied = copy_cqes_ready(ring, cqes)
|
|
if n_copied > 0 { return }
|
|
if wait_nr > 0 || cq_ring_needs_flush(ring) {
|
|
_ = linux.io_uring_enter(ring.fd, 0, wait_nr, {.GETEVENTS}, nil) or_return
|
|
n_copied = copy_cqes_ready(ring, cqes)
|
|
}
|
|
return
|
|
}
|
|
|
|
copy_cqes_ready :: proc(ring: ^Ring, cqes: []linux.IO_Uring_CQE) -> (n_copied: u32) {
|
|
n_ready := cq_ready(ring)
|
|
n_copied = min(u32(len(cqes)), n_ready)
|
|
head := ring.cq.head^
|
|
tail := head + n_copied
|
|
shift := u32(.CQE32 in ring.flags)
|
|
|
|
i := 0
|
|
for head != tail {
|
|
cqes[i] = ring.cq.cqes[(head & ring.cq.mask) << shift]
|
|
head += 1
|
|
i += 1
|
|
}
|
|
cq_advance(ring, n_copied)
|
|
return
|
|
}
|
|
|
|
cq_ring_needs_flush :: proc(ring: ^Ring) -> bool {
|
|
return .CQ_OVERFLOW in sync.atomic_load_explicit(ring.sq.flags, .Relaxed)
|
|
}
|
|
|
|
// For advanced use cases only that implement custom completion queue methods.
|
|
// If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance().
|
|
// Must be called exactly once after a zero-copy CQE has been processed by your application.
|
|
// Not idempotent, calling more than once will result in other CQEs being lost.
|
|
// Matches the implementation of cqe_seen() in liburing.
|
|
cqe_seen :: proc(ring: ^Ring) {
|
|
cq_advance(ring, 1)
|
|
}
|
|
|
|
// For advanced use cases only that implement custom completion queue methods.
|
|
// Matches the implementation of cq_advance() in liburing.
|
|
cq_advance :: proc(ring: ^Ring, count: u32) {
|
|
if count == 0 { return }
|
|
sync.atomic_store_explicit(ring.cq.head, ring.cq.head^ + count, .Release)
|
|
}
|
|
|
|
Submission_Queue :: struct {
|
|
head: ^u32,
|
|
tail: ^u32,
|
|
mask: u32,
|
|
flags: ^linux.IO_Uring_Submission_Queue_Flags,
|
|
dropped: ^u32,
|
|
array: []u32,
|
|
sqes: []linux.IO_Uring_SQE,
|
|
mmap: []u8,
|
|
mmap_sqes: []u8,
|
|
|
|
// We use `sqe_head` and `sqe_tail` in the same way as liburing:
|
|
// We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
|
|
// We then set `tail` to `sqe_tail` once, only when these events are actually submitted.
|
|
// This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs.
|
|
sqe_head: u32,
|
|
sqe_tail: u32,
|
|
}
|
|
|
|
submission_queue_make :: proc(fd: linux.Fd, params: ^linux.IO_Uring_Params) -> (sq: Submission_Queue, err: linux.Errno) {
|
|
assert(fd >= 0, "uninitialized queue fd")
|
|
assert(.SINGLE_MMAP in params.features, "unsupported feature") // NOTE: Could support this, but currently isn't.
|
|
|
|
sq_size := params.sq_off.array + params.sq_entries * size_of(u32)
|
|
cq_size := params.cq_off.cqes + params.cq_entries * size_of(linux.IO_Uring_CQE)
|
|
size := max(sq_size, cq_size)
|
|
|
|
// PERF: .POPULATE commits all pages right away, is that desired?
|
|
|
|
cqe_map := cast([^]byte)(linux.mmap(0, uint(size), {.READ, .WRITE}, {.SHARED, .POPULATE}, fd, linux.IORING_OFF_SQ_RING) or_return)
|
|
defer if err != nil { linux.munmap(cqe_map, uint(size)) }
|
|
|
|
size_sqes := params.sq_entries * size_of(linux.IO_Uring_SQE)
|
|
sqe_map := cast([^]byte)(linux.mmap(0, uint(size_sqes), {.READ, .WRITE}, {.SHARED, .POPULATE}, fd, linux.IORING_OFF_SQES) or_return)
|
|
|
|
array := cast([^]u32)cqe_map[params.sq_off.array:]
|
|
sqes := cast([^]linux.IO_Uring_SQE)sqe_map
|
|
|
|
sq.head = cast(^u32)&cqe_map[params.sq_off.head]
|
|
sq.tail = cast(^u32)&cqe_map[params.sq_off.tail]
|
|
sq.mask = (cast(^u32)&cqe_map[params.sq_off.ring_mask])^
|
|
sq.flags = cast(^linux.IO_Uring_Submission_Queue_Flags)&cqe_map[params.sq_off.flags]
|
|
sq.dropped = cast(^u32)&cqe_map[params.sq_off.dropped]
|
|
sq.array = array[:params.sq_entries]
|
|
sq.sqes = sqes[:params.sq_entries]
|
|
sq.mmap = cqe_map[:size]
|
|
sq.mmap_sqes = sqe_map[:size_sqes]
|
|
|
|
return
|
|
}
|
|
|
|
submission_queue_destroy :: proc(sq: ^Submission_Queue) -> (err: linux.Errno) {
|
|
err = linux.munmap(raw_data(sq.mmap), uint(len(sq.mmap)))
|
|
err2 := linux.munmap(raw_data(sq.mmap_sqes), uint(len(sq.mmap_sqes)))
|
|
if err == nil { err = err2 }
|
|
return
|
|
}
|
|
|
|
Completion_Queue :: struct {
|
|
head: ^u32,
|
|
tail: ^u32,
|
|
mask: u32,
|
|
overflow: ^u32,
|
|
cqes: []linux.IO_Uring_CQE,
|
|
}
|
|
|
|
completion_queue_make :: proc(fd: linux.Fd, params: ^linux.IO_Uring_Params, sq: ^Submission_Queue) -> Completion_Queue {
|
|
assert(fd >= 0, "uninitialized queue fd")
|
|
assert(.SINGLE_MMAP in params.features, "required feature SINGLE_MMAP not supported")
|
|
|
|
mmap := sq.mmap
|
|
cqes := cast([^]linux.IO_Uring_CQE)&mmap[params.cq_off.cqes]
|
|
|
|
return(
|
|
{
|
|
head = cast(^u32)&mmap[params.cq_off.head],
|
|
tail = cast(^u32)&mmap[params.cq_off.tail],
|
|
mask = (cast(^u32)&mmap[params.cq_off.ring_mask])^,
|
|
overflow = cast(^u32)&mmap[params.cq_off.overflow],
|
|
cqes = cqes[:params.cq_entries],
|
|
} \
|
|
)
|
|
}
|