Files
Odin/core/nbio/impl.odin
2026-01-30 23:37:31 +01:00

319 lines
7.5 KiB
Odin

#+private
package nbio
import "base:runtime"
import "base:intrinsics"
import "core:container/pool"
import "core:net"
import "core:reflect"
import "core:slice"
import "core:strings"
import "core:time"
@(init, private)
init_thread_local_cleaner :: proc "contextless" () {
runtime.add_thread_local_cleaner(proc() {
l := &_tls_event_loop
if l.refs > 0 {
l.refs = 1
_release_thread_event_loop()
}
})
}
@(thread_local)
_tls_event_loop: Event_Loop
_acquire_thread_event_loop :: proc() -> General_Error {
l := &_tls_event_loop
if l.err == nil && l.refs == 0 {
when ODIN_ARCH == .wasm32 || ODIN_ARCH == .wasm64p32 && ODIN_OS != .Orca {
allocator := runtime.default_wasm_allocator()
} else {
allocator := runtime.heap_allocator()
}
l.allocator = allocator
if alloc_err := mpsc_init(&l.queue, 128, l.allocator); alloc_err != nil {
l.err = .Allocation_Failed
return l.err
}
defer if l.err != nil { mpsc_destroy(&l.queue, l.allocator) }
if pool_err := pool.init(&l.operation_pool, "_pool_link"); pool_err != nil {
l.err = .Allocation_Failed
return l.err
}
defer if l.err != nil { pool.destroy(&l.operation_pool) }
l.err = _init(l, allocator)
l.now = time.now()
}
if l.err != nil {
return l.err
}
l.refs += 1
return nil
}
_release_thread_event_loop :: proc() {
l := &_tls_event_loop
if l.err != nil {
assert(l.refs == 0)
return
}
if l.refs > 0 {
l.refs -= 1
if l.refs == 0 {
mpsc_destroy(&l.queue, l.allocator)
pool.destroy(&l.operation_pool)
_destroy(l)
l^ = {}
}
}
}
_current_thread_event_loop :: #force_inline proc(loc := #caller_location) -> (^Event_Loop) {
l := &_tls_event_loop
if intrinsics.expect(l.refs == 0, false) {
return nil
}
return l
}
_tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
// Receive operations queued from other threads first.
for {
op := (^Operation)(mpsc_dequeue(&l.queue))
if op == nil { break }
_exec(op)
}
return __tick(l, timeout)
}
_listen_tcp :: proc(
l: ^Event_Loop,
endpoint: Endpoint,
backlog := 1000,
loc := #caller_location,
) -> (
socket: TCP_Socket,
err: Network_Error,
) {
family := family_from_endpoint(endpoint)
socket = create_tcp_socket(family, l, loc) or_return
defer if err != nil { close(socket, l=l) }
net.set_option(socket, .Reuse_Address, true)
bind(socket, endpoint) or_return
_listen(socket, backlog) or_return
return
}
_read_entire_file :: proc(l: ^Event_Loop, path: string, user_data: rawptr, cb: Read_Entire_File_Callback, allocator := context.allocator, dir := CWD) {
open_poly3(path, user_data, cb, allocator, on_open, dir=dir, l=l)
on_open :: proc(op: ^Operation, user_data: rawptr, cb: Read_Entire_File_Callback, allocator: runtime.Allocator) {
if op.open.err != nil {
cb(user_data, nil, {.Open, op.open.err})
return
}
stat_poly3(op.open.handle, user_data, cb, allocator, on_stat)
}
on_stat :: proc(op: ^Operation, user_data: rawptr, cb: Read_Entire_File_Callback, allocator: runtime.Allocator) {
if op.stat.err != nil {
close(op.stat.handle)
cb(user_data, nil, {.Stat, op.stat.err})
return
}
if op.stat.type != .Regular {
close(op.stat.handle)
cb(user_data, nil, {.Stat, .Unsupported})
return
}
buf, err := make([]byte, op.stat.size, allocator)
if err != nil {
close(op.stat.handle)
cb(user_data, nil, {.Read, .Allocation_Failed})
return
}
read_poly3(op.stat.handle, 0, buf, user_data, cb, allocator, on_read, all=true)
}
on_read :: proc(op: ^Operation, user_data: rawptr, cb: Read_Entire_File_Callback, allocator: runtime.Allocator) {
close(op.read.handle)
if op.read.err != nil {
delete(op.read.buf, allocator)
cb(user_data, nil, {.Read, op.read.err})
return
}
assert(op.read.read == len(op.read.buf))
cb(user_data, op.read.buf, {})
}
}
NBIO_DEBUG :: #config(NBIO_DEBUG, false)
Debuggable :: union {
Operation_Type,
string,
int,
time.Time,
time.Duration,
}
@(disabled=!NBIO_DEBUG)
debug :: proc(contents: ..Debuggable, location := #caller_location) {
if context.logger.procedure == nil || .Debug < context.logger.lowest_level {
return
}
runtime.DEFAULT_TEMP_ALLOCATOR_TEMP_GUARD()
b: strings.Builder
b.buf.allocator = context.temp_allocator
strings.write_string(&b, "[nbio] ")
for content, i in contents {
switch val in content {
case Operation_Type:
name, _ := reflect.enum_name_from_value(val)
strings.write_string(&b, name)
case string:
strings.write_string(&b, val)
case int:
strings.write_int(&b, val)
case time.Duration:
ms := time.duration_milliseconds(val)
strings.write_f64(&b, ms, 'f')
strings.write_string(&b, "ms")
case time.Time:
buf: [time.MIN_HMS_LEN+1]byte
h, m, s, ns := time.precise_clock_from_time(val)
buf[8] = '.'
buf[7] = '0' + u8(s % 10); s /= 10
buf[6] = '0' + u8(s)
buf[5] = ':'
buf[4] = '0' + u8(m % 10); m /= 10
buf[3] = '0' + u8(m)
buf[2] = ':'
buf[1] = '0' + u8(h % 10); h /= 10
buf[0] = '0' + u8(h)
strings.write_string(&b, string(buf[:]))
strings.write_int(&b, ns)
}
if i < len(contents)-1 {
strings.write_byte(&b, ' ')
}
}
context.logger.procedure(context.logger.data, .Debug, strings.to_string(b), context.logger.options, location)
}
warn :: proc(text: string, location := #caller_location) {
if context.logger.procedure == nil || .Warning < context.logger.lowest_level {
return
}
context.logger.procedure(context.logger.data, .Warning, text, context.logger.options, location)
}
/*
In order to:
1. Not require the caller to allocate their buffers (`op.send.bufs` and `op.recv.bufs` can be stack allocated)
2. Have `op.send.bufs` and `op.recv.bufs` be valid and the same content in the callback as when called
3. Be able to facilitate the `all` option, which requires mutating the slices (advancing them)
4. Constraint single send/recv syscalls to MAX_RW bytes
We need to copy the input buffers twice, once for a stable copy returned to the user,
and one for the working copy that we mutate with `all` set.
*/
Bufs :: struct {
backing: [1][]byte,
working: struct #raw_union {
small: [1][]byte,
big: [][]byte,
},
}
bufs_init :: proc(bufs: ^Bufs, orig: ^[][]byte, allocator: runtime.Allocator) -> runtime.Allocator_Error {
if len(orig) > 1 {
backing := make([][]byte, len(orig)*2, allocator) or_return
bufs.working.big = backing[len(orig):]
copy(bufs.working.big, orig^)
copy(backing, orig^)
orig^ = backing[:len(orig)]
return nil
}
bufs.backing = {orig[0]}
orig^ = bufs.backing[:]
return nil
}
bufs_delete :: proc(bufs: ^Bufs, orig: [][]byte, allocator: runtime.Allocator) {
if len(orig) > 1 {
backing := raw_data(orig)[:len(orig)*2]
delete(backing, allocator)
}
}
@(require_results)
bufs_to_process :: proc(bufs: ^Bufs, orig: [][]byte, processed: int) -> (working: [][]byte, total: int) {
if len(orig) > 1 {
// Reset to length and contents of backing, so a previous modification is removed.
(^runtime.Raw_Slice)(&bufs.working.big).len = len(orig)
copy(bufs.working.big, orig)
working = bufs.working.big
} else {
bufs.working.small = {orig[0]}
working = bufs.working.small[:]
}
working = slice.advance_slices(working, processed)
working, total = constraint_bufs_to_max_rw(working)
return
}
@(require_results)
constraint_bufs_to_max_rw :: proc(bufs: [][]byte) -> (constrained: [][]byte, total: int) {
for buf in bufs {
total += len(buf)
}
constrained = bufs
for n := total; n > MAX_RW; {
last := &constrained[len(constrained)-1]
take := min(len(last), n-MAX_RW)
last^ = last[:take]
if len(last) == 0 {
constrained = constrained[:len(constrained)-1]
}
n -= take
}
return
}