mirror of
https://github.com/odin-lang/Odin.git
synced 2026-02-12 14:23:33 +00:00
319 lines
7.5 KiB
Odin
319 lines
7.5 KiB
Odin
#+private
|
|
package nbio
|
|
|
|
import "base:runtime"
|
|
import "base:intrinsics"
|
|
|
|
import "core:container/pool"
|
|
import "core:net"
|
|
import "core:reflect"
|
|
import "core:slice"
|
|
import "core:strings"
|
|
import "core:time"
|
|
|
|
@(init, private)
|
|
init_thread_local_cleaner :: proc "contextless" () {
|
|
runtime.add_thread_local_cleaner(proc() {
|
|
l := &_tls_event_loop
|
|
if l.refs > 0 {
|
|
l.refs = 1
|
|
_release_thread_event_loop()
|
|
}
|
|
})
|
|
}
|
|
|
|
@(thread_local)
|
|
_tls_event_loop: Event_Loop
|
|
|
|
_acquire_thread_event_loop :: proc() -> General_Error {
|
|
l := &_tls_event_loop
|
|
if l.err == nil && l.refs == 0 {
|
|
when ODIN_ARCH == .wasm32 || ODIN_ARCH == .wasm64p32 && ODIN_OS != .Orca {
|
|
allocator := runtime.default_wasm_allocator()
|
|
} else {
|
|
allocator := runtime.heap_allocator()
|
|
}
|
|
|
|
l.allocator = allocator
|
|
|
|
if alloc_err := mpsc_init(&l.queue, 128, l.allocator); alloc_err != nil {
|
|
l.err = .Allocation_Failed
|
|
return l.err
|
|
}
|
|
defer if l.err != nil { mpsc_destroy(&l.queue, l.allocator) }
|
|
|
|
if pool_err := pool.init(&l.operation_pool, "_pool_link"); pool_err != nil {
|
|
l.err = .Allocation_Failed
|
|
return l.err
|
|
}
|
|
defer if l.err != nil { pool.destroy(&l.operation_pool) }
|
|
|
|
l.err = _init(l, allocator)
|
|
l.now = time.now()
|
|
}
|
|
|
|
if l.err != nil {
|
|
return l.err
|
|
}
|
|
|
|
l.refs += 1
|
|
return nil
|
|
}
|
|
|
|
_release_thread_event_loop :: proc() {
|
|
l := &_tls_event_loop
|
|
if l.err != nil {
|
|
assert(l.refs == 0)
|
|
return
|
|
}
|
|
|
|
if l.refs > 0 {
|
|
l.refs -= 1
|
|
if l.refs == 0 {
|
|
mpsc_destroy(&l.queue, l.allocator)
|
|
pool.destroy(&l.operation_pool)
|
|
_destroy(l)
|
|
l^ = {}
|
|
}
|
|
}
|
|
}
|
|
|
|
_current_thread_event_loop :: #force_inline proc(loc := #caller_location) -> (^Event_Loop) {
|
|
l := &_tls_event_loop
|
|
|
|
if intrinsics.expect(l.refs == 0, false) {
|
|
return nil
|
|
}
|
|
|
|
return l
|
|
}
|
|
|
|
_tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
|
// Receive operations queued from other threads first.
|
|
for {
|
|
op := (^Operation)(mpsc_dequeue(&l.queue))
|
|
if op == nil { break }
|
|
_exec(op)
|
|
}
|
|
|
|
return __tick(l, timeout)
|
|
}
|
|
|
|
_listen_tcp :: proc(
|
|
l: ^Event_Loop,
|
|
endpoint: Endpoint,
|
|
backlog := 1000,
|
|
loc := #caller_location,
|
|
) -> (
|
|
socket: TCP_Socket,
|
|
err: Network_Error,
|
|
) {
|
|
family := family_from_endpoint(endpoint)
|
|
socket = create_tcp_socket(family, l, loc) or_return
|
|
defer if err != nil { close(socket, l=l) }
|
|
|
|
net.set_option(socket, .Reuse_Address, true)
|
|
|
|
bind(socket, endpoint) or_return
|
|
|
|
_listen(socket, backlog) or_return
|
|
return
|
|
}
|
|
|
|
_read_entire_file :: proc(l: ^Event_Loop, path: string, user_data: rawptr, cb: Read_Entire_File_Callback, allocator := context.allocator, dir := CWD) {
|
|
open_poly3(path, user_data, cb, allocator, on_open, dir=dir, l=l)
|
|
|
|
on_open :: proc(op: ^Operation, user_data: rawptr, cb: Read_Entire_File_Callback, allocator: runtime.Allocator) {
|
|
if op.open.err != nil {
|
|
cb(user_data, nil, {.Open, op.open.err})
|
|
return
|
|
}
|
|
|
|
stat_poly3(op.open.handle, user_data, cb, allocator, on_stat)
|
|
}
|
|
|
|
on_stat :: proc(op: ^Operation, user_data: rawptr, cb: Read_Entire_File_Callback, allocator: runtime.Allocator) {
|
|
if op.stat.err != nil {
|
|
close(op.stat.handle)
|
|
cb(user_data, nil, {.Stat, op.stat.err})
|
|
return
|
|
}
|
|
|
|
if op.stat.type != .Regular {
|
|
close(op.stat.handle)
|
|
cb(user_data, nil, {.Stat, .Unsupported})
|
|
return
|
|
}
|
|
|
|
buf, err := make([]byte, op.stat.size, allocator)
|
|
if err != nil {
|
|
close(op.stat.handle)
|
|
cb(user_data, nil, {.Read, .Allocation_Failed})
|
|
return
|
|
}
|
|
|
|
read_poly3(op.stat.handle, 0, buf, user_data, cb, allocator, on_read, all=true)
|
|
}
|
|
|
|
on_read :: proc(op: ^Operation, user_data: rawptr, cb: Read_Entire_File_Callback, allocator: runtime.Allocator) {
|
|
close(op.read.handle)
|
|
|
|
if op.read.err != nil {
|
|
delete(op.read.buf, allocator)
|
|
cb(user_data, nil, {.Read, op.read.err})
|
|
return
|
|
}
|
|
|
|
assert(op.read.read == len(op.read.buf))
|
|
cb(user_data, op.read.buf, {})
|
|
}
|
|
}
|
|
|
|
NBIO_DEBUG :: #config(NBIO_DEBUG, false)
|
|
|
|
Debuggable :: union {
|
|
Operation_Type,
|
|
string,
|
|
int,
|
|
time.Time,
|
|
time.Duration,
|
|
}
|
|
|
|
@(disabled=!NBIO_DEBUG)
|
|
debug :: proc(contents: ..Debuggable, location := #caller_location) {
|
|
if context.logger.procedure == nil || .Debug < context.logger.lowest_level {
|
|
return
|
|
}
|
|
|
|
runtime.DEFAULT_TEMP_ALLOCATOR_TEMP_GUARD()
|
|
|
|
b: strings.Builder
|
|
b.buf.allocator = context.temp_allocator
|
|
|
|
strings.write_string(&b, "[nbio] ")
|
|
|
|
for content, i in contents {
|
|
switch val in content {
|
|
case Operation_Type:
|
|
name, _ := reflect.enum_name_from_value(val)
|
|
strings.write_string(&b, name)
|
|
case string:
|
|
strings.write_string(&b, val)
|
|
case int:
|
|
strings.write_int(&b, val)
|
|
case time.Duration:
|
|
ms := time.duration_milliseconds(val)
|
|
strings.write_f64(&b, ms, 'f')
|
|
strings.write_string(&b, "ms")
|
|
|
|
case time.Time:
|
|
buf: [time.MIN_HMS_LEN+1]byte
|
|
h, m, s, ns := time.precise_clock_from_time(val)
|
|
buf[8] = '.'
|
|
buf[7] = '0' + u8(s % 10); s /= 10
|
|
buf[6] = '0' + u8(s)
|
|
buf[5] = ':'
|
|
buf[4] = '0' + u8(m % 10); m /= 10
|
|
buf[3] = '0' + u8(m)
|
|
buf[2] = ':'
|
|
buf[1] = '0' + u8(h % 10); h /= 10
|
|
buf[0] = '0' + u8(h)
|
|
|
|
strings.write_string(&b, string(buf[:]))
|
|
strings.write_int(&b, ns)
|
|
}
|
|
|
|
if i < len(contents)-1 {
|
|
strings.write_byte(&b, ' ')
|
|
}
|
|
}
|
|
|
|
context.logger.procedure(context.logger.data, .Debug, strings.to_string(b), context.logger.options, location)
|
|
}
|
|
|
|
warn :: proc(text: string, location := #caller_location) {
|
|
if context.logger.procedure == nil || .Warning < context.logger.lowest_level {
|
|
return
|
|
}
|
|
|
|
context.logger.procedure(context.logger.data, .Warning, text, context.logger.options, location)
|
|
}
|
|
|
|
/*
|
|
In order to:
|
|
1. Not require the caller to allocate their buffers (`op.send.bufs` and `op.recv.bufs` can be stack allocated)
|
|
2. Have `op.send.bufs` and `op.recv.bufs` be valid and the same content in the callback as when called
|
|
3. Be able to facilitate the `all` option, which requires mutating the slices (advancing them)
|
|
4. Constraint single send/recv syscalls to MAX_RW bytes
|
|
|
|
We need to copy the input buffers twice, once for a stable copy returned to the user,
|
|
and one for the working copy that we mutate with `all` set.
|
|
*/
|
|
|
|
Bufs :: struct {
|
|
backing: [1][]byte,
|
|
working: struct #raw_union {
|
|
small: [1][]byte,
|
|
big: [][]byte,
|
|
},
|
|
}
|
|
|
|
bufs_init :: proc(bufs: ^Bufs, orig: ^[][]byte, allocator: runtime.Allocator) -> runtime.Allocator_Error {
|
|
if len(orig) > 1 {
|
|
backing := make([][]byte, len(orig)*2, allocator) or_return
|
|
bufs.working.big = backing[len(orig):]
|
|
copy(bufs.working.big, orig^)
|
|
copy(backing, orig^)
|
|
orig^ = backing[:len(orig)]
|
|
return nil
|
|
}
|
|
|
|
bufs.backing = {orig[0]}
|
|
orig^ = bufs.backing[:]
|
|
return nil
|
|
}
|
|
|
|
bufs_delete :: proc(bufs: ^Bufs, orig: [][]byte, allocator: runtime.Allocator) {
|
|
if len(orig) > 1 {
|
|
backing := raw_data(orig)[:len(orig)*2]
|
|
delete(backing, allocator)
|
|
}
|
|
}
|
|
|
|
@(require_results)
|
|
bufs_to_process :: proc(bufs: ^Bufs, orig: [][]byte, processed: int) -> (working: [][]byte, total: int) {
|
|
if len(orig) > 1 {
|
|
// Reset to length and contents of backing, so a previous modification is removed.
|
|
(^runtime.Raw_Slice)(&bufs.working.big).len = len(orig)
|
|
copy(bufs.working.big, orig)
|
|
working = bufs.working.big
|
|
} else {
|
|
bufs.working.small = {orig[0]}
|
|
working = bufs.working.small[:]
|
|
}
|
|
|
|
working = slice.advance_slices(working, processed)
|
|
working, total = constraint_bufs_to_max_rw(working)
|
|
return
|
|
}
|
|
|
|
@(require_results)
|
|
constraint_bufs_to_max_rw :: proc(bufs: [][]byte) -> (constrained: [][]byte, total: int) {
|
|
for buf in bufs {
|
|
total += len(buf)
|
|
}
|
|
|
|
constrained = bufs
|
|
for n := total; n > MAX_RW; {
|
|
last := &constrained[len(constrained)-1]
|
|
take := min(len(last), n-MAX_RW)
|
|
last^ = last[:take]
|
|
if len(last) == 0 {
|
|
constrained = constrained[:len(constrained)-1]
|
|
}
|
|
n -= take
|
|
}
|
|
|
|
return
|
|
}
|