mirror of
https://github.com/odin-lang/Odin.git
synced 2026-02-12 14:23:33 +00:00
Merge pull request #6142 from laytan/nbio-improvements
nbio: improvements
This commit is contained in:
@@ -5,10 +5,8 @@ import "base:runtime"
|
||||
import "base:intrinsics"
|
||||
|
||||
import "core:container/pool"
|
||||
import "core:container/queue"
|
||||
import "core:net"
|
||||
import "core:strings"
|
||||
import "core:sync"
|
||||
import "core:time"
|
||||
import "core:reflect"
|
||||
|
||||
@@ -35,7 +33,13 @@ _acquire_thread_event_loop :: proc() -> General_Error {
|
||||
allocator := runtime.heap_allocator()
|
||||
}
|
||||
|
||||
l.queue.data.allocator = allocator
|
||||
l.allocator = allocator
|
||||
|
||||
if alloc_err := mpsc_init(&l.queue, 128, l.allocator); alloc_err != nil {
|
||||
l.err = .Allocation_Failed
|
||||
return l.err
|
||||
}
|
||||
defer if l.err != nil { mpsc_destroy(&l.queue, l.allocator) }
|
||||
|
||||
if pool_err := pool.init(&l.operation_pool, "_pool_link"); pool_err != nil {
|
||||
l.err = .Allocation_Failed
|
||||
@@ -65,7 +69,7 @@ _release_thread_event_loop :: proc() {
|
||||
if l.refs > 0 {
|
||||
l.refs -= 1
|
||||
if l.refs == 0 {
|
||||
queue.destroy(&l.queue)
|
||||
mpsc_destroy(&l.queue, l.allocator)
|
||||
pool.destroy(&l.operation_pool)
|
||||
_destroy(l)
|
||||
l^ = {}
|
||||
@@ -85,11 +89,10 @@ _current_thread_event_loop :: #force_inline proc(loc := #caller_location) -> (^E
|
||||
|
||||
_tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
||||
// Receive operations queued from other threads first.
|
||||
{
|
||||
sync.guard(&l.queue_mu)
|
||||
for op in queue.pop_front_safe(&l.queue) {
|
||||
_exec(op)
|
||||
}
|
||||
for {
|
||||
op := (^Operation)(mpsc_dequeue(&l.queue))
|
||||
if op == nil { break }
|
||||
_exec(op)
|
||||
}
|
||||
|
||||
return __tick(l, timeout)
|
||||
|
||||
@@ -23,7 +23,6 @@ _Event_Loop :: struct {
|
||||
unqueued: queue.Queue(^Operation),
|
||||
// Ready to run callbacks, mainly next tick, some other ops that error outside the kernel.
|
||||
completed: queue.Queue(^Operation),
|
||||
allocator: mem.Allocator,
|
||||
wake: ^Operation,
|
||||
}
|
||||
|
||||
@@ -128,8 +127,6 @@ _Stat :: struct {
|
||||
|
||||
@(private="package")
|
||||
_init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) {
|
||||
l.allocator = alloc
|
||||
|
||||
params := uring.DEFAULT_PARAMS
|
||||
params.flags += {.SUBMIT_ALL, .COOP_TASKRUN, .SINGLE_ISSUER}
|
||||
|
||||
@@ -472,6 +469,11 @@ _wake_up :: proc(l: ^Event_Loop) {
|
||||
assert(n == 8)
|
||||
}
|
||||
|
||||
@(private="package")
|
||||
_yield :: proc() {
|
||||
linux.sched_yield()
|
||||
}
|
||||
|
||||
// Start file private.
|
||||
|
||||
// The size of the IO Uring queues.
|
||||
|
||||
@@ -19,7 +19,6 @@ _FULLY_SUPPORTED :: false
|
||||
_Event_Loop :: struct {
|
||||
completed: queue.Queue(^Operation),
|
||||
timeouts: avl.Tree(^Operation),
|
||||
allocator: mem.Allocator,
|
||||
}
|
||||
|
||||
_Handle :: uintptr
|
||||
@@ -69,7 +68,6 @@ _Remove :: struct {}
|
||||
_Link_Timeout :: struct {}
|
||||
|
||||
_init :: proc(l: ^Event_Loop, allocator: mem.Allocator) -> (rerr: General_Error) {
|
||||
l.allocator = allocator
|
||||
l.completed.data.allocator = allocator
|
||||
|
||||
avl.init_cmp(&l.timeouts, timeouts_cmp, allocator)
|
||||
@@ -215,3 +213,6 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err
|
||||
|
||||
_wake_up :: proc(l: ^Event_Loop) {
|
||||
}
|
||||
|
||||
_yield :: proc() {
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ _Event_Loop :: struct {
|
||||
// We have to keep record of what we currently have in the kqueue, and if we get an operation
|
||||
// that would be the same (ident, filter) pair we need to bundle the operations under one kevent.
|
||||
submitted: map[Queue_Identifier]^Operation,
|
||||
allocator: mem.Allocator,
|
||||
// Holds all events we want to flush. Flushing is done each tick at which point this is emptied.
|
||||
pending: sa.Small_Array(QUEUE_SIZE, kq.KEvent),
|
||||
// Holds what should be in `pending` but didn't fit.
|
||||
@@ -107,7 +106,6 @@ _Link_Timeout :: struct {}
|
||||
|
||||
@(private="package")
|
||||
_init :: proc(l: ^Event_Loop, allocator: mem.Allocator) -> (rerr: General_Error) {
|
||||
l.allocator = allocator
|
||||
l.submitted.allocator = allocator
|
||||
l.overflow.data.allocator = allocator
|
||||
l.completed.data.allocator = allocator
|
||||
@@ -510,6 +508,7 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err
|
||||
|
||||
@(private="package")
|
||||
_wake_up :: proc(l: ^Event_Loop) {
|
||||
// TODO: only if we are sleeping (like Windows).
|
||||
ev := [1]kq.KEvent{
|
||||
{
|
||||
ident = IDENT_WAKE_UP,
|
||||
@@ -526,6 +525,11 @@ _wake_up :: proc(l: ^Event_Loop) {
|
||||
assert(n == 0)
|
||||
}
|
||||
|
||||
@(private="package")
|
||||
_yield :: proc() {
|
||||
posix.sched_yield()
|
||||
}
|
||||
|
||||
// Start file private.
|
||||
|
||||
// Max operations that can be enqueued per tick.
|
||||
|
||||
@@ -8,10 +8,11 @@ import "core:container/pool"
|
||||
import "core:container/queue"
|
||||
import "core:mem"
|
||||
import "core:net"
|
||||
import "core:path/filepath"
|
||||
import "core:slice"
|
||||
import "core:strings"
|
||||
import "core:sync"
|
||||
import "core:time"
|
||||
import "core:path/filepath"
|
||||
|
||||
import win "core:sys/windows"
|
||||
|
||||
@@ -20,10 +21,15 @@ _FULLY_SUPPORTED :: true
|
||||
|
||||
@(private="package")
|
||||
_Event_Loop :: struct {
|
||||
iocp: win.HANDLE,
|
||||
allocator: mem.Allocator,
|
||||
timeouts: avl.Tree(^Operation),
|
||||
completed: queue.Queue(^Operation),
|
||||
timeouts: avl.Tree(^Operation),
|
||||
thread: win.HANDLE,
|
||||
completed: queue.Queue(^Operation),
|
||||
completed_oob: Multi_Producer_Single_Consumer,
|
||||
state: enum {
|
||||
Working,
|
||||
Waking,
|
||||
Sleeping,
|
||||
},
|
||||
}
|
||||
|
||||
@(private="package")
|
||||
@@ -107,47 +113,32 @@ _Stat :: struct {}
|
||||
_init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) {
|
||||
l.allocator = alloc
|
||||
|
||||
mem_err: mem.Allocator_Error
|
||||
if mem_err = queue.init(&l.completed, allocator = alloc); mem_err != nil {
|
||||
err = .Allocation_Failed
|
||||
return
|
||||
}
|
||||
defer if err != nil { queue.destroy(&l.completed) }
|
||||
l.completed.data.allocator = l.allocator
|
||||
|
||||
avl.init(&l.timeouts, timeouts_cmp, alloc)
|
||||
|
||||
win.ensure_winsock_initialized()
|
||||
mpsc_init(&l.completed_oob, QUEUE_SIZE, l.allocator)
|
||||
defer if err != nil { mpsc_destroy(&l.completed_oob, l.allocator) }
|
||||
|
||||
l.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 1)
|
||||
if l.iocp == nil {
|
||||
err = General_Error(win.GetLastError())
|
||||
return
|
||||
}
|
||||
dup_ok := win.DuplicateHandle(
|
||||
win.GetCurrentProcess(), win.GetCurrentThread(),
|
||||
win.GetCurrentProcess(), &l.thread,
|
||||
0, false, win.DUPLICATE_SAME_ACCESS,
|
||||
)
|
||||
ensure(dup_ok == true)
|
||||
defer if err != nil { win.CloseHandle(l.thread) }
|
||||
|
||||
err = g_ref()
|
||||
return
|
||||
|
||||
timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering {
|
||||
switch {
|
||||
case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec:
|
||||
return .Less
|
||||
case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec:
|
||||
return .Greater
|
||||
case uintptr(a) < uintptr(b):
|
||||
return .Less
|
||||
case uintptr(a) > uintptr(b):
|
||||
return .Greater
|
||||
case:
|
||||
assert(a == b)
|
||||
return .Equal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@(private="package")
|
||||
_destroy :: proc(l: ^Event_Loop) {
|
||||
queue.destroy(&l.completed)
|
||||
avl.destroy(&l.timeouts)
|
||||
win.CloseHandle(l.iocp)
|
||||
queue.destroy(&l.completed)
|
||||
mpsc_destroy(&l.completed_oob, l.allocator)
|
||||
win.CloseHandle(l.thread)
|
||||
g_unref()
|
||||
}
|
||||
|
||||
@(private="package")
|
||||
@@ -166,10 +157,16 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
op := (^Operation)(mpsc_dequeue(&l.completed_oob))
|
||||
if op == nil { break }
|
||||
handle_completed(op)
|
||||
}
|
||||
|
||||
if pool.num_outstanding(&l.operation_pool) == 0 { return nil }
|
||||
|
||||
actual_timeout := win.INFINITE
|
||||
if queue.len(l.completed) > 0 {
|
||||
if queue.len(l.completed) > 0 || mpsc_count(&l.completed_oob) > 0 {
|
||||
actual_timeout = 0
|
||||
} else if timeout >= 0 {
|
||||
actual_timeout = win.DWORD(timeout / time.Millisecond)
|
||||
@@ -178,14 +175,33 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
||||
actual_timeout = min(actual_timeout, win.DWORD(nt / time.Millisecond))
|
||||
}
|
||||
|
||||
if actual_timeout > 0 {
|
||||
sync.atomic_store_explicit(&l.state, .Sleeping, .Release)
|
||||
|
||||
// There could be a race condition where we go sleeping at the same time as things get queued
|
||||
// and a wakeup isn't done because the state is not .Sleeping yet.
|
||||
// So after sleeping we first check our queues.
|
||||
|
||||
for {
|
||||
op := (^Operation)(mpsc_dequeue(&l.queue))
|
||||
if op == nil { break }
|
||||
_exec(op)
|
||||
}
|
||||
|
||||
for {
|
||||
op := (^Operation)(mpsc_dequeue(&l.completed_oob))
|
||||
if op == nil { break }
|
||||
handle_completed(op)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
QUEUE_SIZE :: 256
|
||||
events: [QUEUE_SIZE]win.OVERLAPPED_ENTRY
|
||||
events: [256]win.OVERLAPPED_ENTRY
|
||||
entries_removed: win.ULONG
|
||||
if !win.GetQueuedCompletionStatusEx(l.iocp, &events[0], len(events), &entries_removed, actual_timeout, false) {
|
||||
if !win.GetQueuedCompletionStatusEx(g.iocp, &events[0], len(events), &entries_removed, actual_timeout, true) {
|
||||
winerr := win.GetLastError()
|
||||
switch winerr {
|
||||
case win.WAIT_TIMEOUT:
|
||||
case win.WAIT_TIMEOUT, win.WAIT_IO_COMPLETION:
|
||||
entries_removed = 0
|
||||
case:
|
||||
err = General_Error(winerr)
|
||||
@@ -193,6 +209,8 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
||||
}
|
||||
}
|
||||
|
||||
sync.atomic_store_explicit(&l.state, .Working, .Relaxed)
|
||||
|
||||
if actual_timeout > 0 {
|
||||
// We may have just waited some time, lets update the current time.
|
||||
l.now = time.now()
|
||||
@@ -203,13 +221,23 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
||||
}
|
||||
|
||||
for event in events[:entries_removed] {
|
||||
if event.lpCompletionKey == COMPLETION_KEY_WAKE_UP { continue }
|
||||
assert(event.lpOverlapped != nil)
|
||||
op := container_of(container_of(event.lpOverlapped, _Operation, "over"), Operation, "_impl")
|
||||
handle_completed(op)
|
||||
|
||||
if op.l == l {
|
||||
handle_completed(op)
|
||||
} else {
|
||||
op_l := op.l
|
||||
for !mpsc_enqueue(&op.l.completed_oob, op) {
|
||||
warn("oob queue filled up, QUEUE_SIZE may need increasing")
|
||||
_wake_up(op_l)
|
||||
win.SwitchToThread()
|
||||
}
|
||||
_wake_up(op_l)
|
||||
}
|
||||
}
|
||||
|
||||
if entries_removed < QUEUE_SIZE {
|
||||
if entries_removed < len(events) {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -249,11 +277,20 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
||||
continue
|
||||
}
|
||||
|
||||
expires = cexpires
|
||||
debug("first timeout in the future is at", op.timeout._impl.expires, "after", cexpires)
|
||||
return
|
||||
break
|
||||
}
|
||||
|
||||
// Don't merge this with the previous iteration because the `handle_completed` in that one might queue
|
||||
// more timeouts which we want to detect here.
|
||||
// For example: `timeout(time.Second, proc(_: ^Operation) { timeout(time.Second, ...) })`
|
||||
|
||||
first := avl.first(&l.timeouts)
|
||||
if first != nil {
|
||||
op := first.value
|
||||
cexpires := time.diff(curr, op.timeout._impl.expires)
|
||||
debug("first timeout in the future is at", op.timeout._impl.expires, "after", cexpires)
|
||||
expires = cexpires
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -649,7 +686,7 @@ _remove :: proc(target: ^Operation) {
|
||||
target.poll._impl.wait_handle = nil
|
||||
|
||||
ok := win.PostQueuedCompletionStatus(
|
||||
target.l.iocp,
|
||||
g.iocp,
|
||||
0,
|
||||
0,
|
||||
&target._impl.over,
|
||||
@@ -702,8 +739,8 @@ _remove :: proc(target: ^Operation) {
|
||||
|
||||
@(private="package")
|
||||
_associate_handle :: proc(handle: uintptr, l: ^Event_Loop) -> (Handle, Association_Error) {
|
||||
handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), l.iocp, 0, 0)
|
||||
if handle_iocp != l.iocp {
|
||||
handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), g.iocp, 0, 0)
|
||||
if handle_iocp != g.iocp {
|
||||
return INVALID_HANDLE, .Not_Possible_To_Associate
|
||||
}
|
||||
|
||||
@@ -737,27 +774,69 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err
|
||||
|
||||
@(private="package")
|
||||
_wake_up :: proc(l: ^Event_Loop) {
|
||||
win.PostQueuedCompletionStatus(
|
||||
l.iocp,
|
||||
0,
|
||||
COMPLETION_KEY_WAKE_UP,
|
||||
nil,
|
||||
)
|
||||
_, exchanged := sync.atomic_compare_exchange_strong(&l.state, .Sleeping, .Waking)
|
||||
if exchanged {
|
||||
win.QueueUserAPC(proc "system" (Parameter: win.ULONG_PTR) {}, l.thread, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@(private="package")
|
||||
_yield :: proc() {
|
||||
win.SwitchToThread()
|
||||
}
|
||||
|
||||
// Start file private.
|
||||
|
||||
QUEUE_SIZE :: 128
|
||||
|
||||
REMOVED :: rawptr(max(uintptr)-1)
|
||||
|
||||
INVALID_HANDLE :: Handle(win.INVALID_HANDLE)
|
||||
|
||||
COMPLETION_KEY_WAKE_UP :: 69
|
||||
|
||||
Op_Result :: enum {
|
||||
Done,
|
||||
Pending,
|
||||
}
|
||||
|
||||
/*
|
||||
IOCP is designed to be used from multiple threads.
|
||||
For best performance we need to adhere to that and have one single IOCP for the event loops to share.
|
||||
*/
|
||||
g: struct{
|
||||
mu: sync.Mutex,
|
||||
refs: int,
|
||||
iocp: win.HANDLE,
|
||||
err: General_Error,
|
||||
}
|
||||
|
||||
g_ref :: proc() -> General_Error {
|
||||
sync.guard(&g.mu)
|
||||
|
||||
if g.refs == 0 {
|
||||
win.ensure_winsock_initialized()
|
||||
|
||||
// NOTE: setting NumberOfConcurrentThreads to 0 which makes Windows use the amount of processors as a default.
|
||||
// We may want to make this configurable somehow?
|
||||
g.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 0)
|
||||
if g.iocp == nil {
|
||||
g.err = General_Error(win.GetLastError())
|
||||
}
|
||||
}
|
||||
|
||||
sync.atomic_add(&g.refs, 1)
|
||||
|
||||
return sync.atomic_load(&g.err)
|
||||
}
|
||||
|
||||
g_unref :: proc() {
|
||||
sync.guard(&g.mu)
|
||||
|
||||
if sync.atomic_sub(&g.refs, 1) == 1 {
|
||||
win.CloseHandle(g.iocp)
|
||||
g.err = nil
|
||||
}
|
||||
}
|
||||
|
||||
operation_handle :: proc(op: ^Operation) -> win.HANDLE {
|
||||
switch op.type {
|
||||
case .Accept: return win.HANDLE(uintptr(op.accept.socket))
|
||||
@@ -845,11 +924,11 @@ accept_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
&received,
|
||||
&op._impl.over,
|
||||
) {
|
||||
if op._impl.over.Internal == nil {
|
||||
op.accept.err = net._accept_error()
|
||||
} else if is_pending(op._impl.over) {
|
||||
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
||||
link_timeout(op, op.accept.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
op.accept.err = net._accept_error()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -947,14 +1026,12 @@ dial_exec :: proc(op: ^Operation) -> (result: Op_Result) {
|
||||
&transferred,
|
||||
&op._impl.over,
|
||||
) {
|
||||
if op._impl.over.Internal == nil {
|
||||
op.dial.err = net._dial_error()
|
||||
} else if is_pending(op._impl.over) {
|
||||
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
||||
link_timeout(op, op.dial.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
op.dial.err = net._dial_error()
|
||||
}
|
||||
|
||||
return .Done
|
||||
}
|
||||
|
||||
return .Done
|
||||
@@ -1004,11 +1081,16 @@ read_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
&op._impl.over,
|
||||
) {
|
||||
assert(read == 0)
|
||||
if op._impl.over.Internal == nil {
|
||||
op.read.err = FS_Error(win.GetLastError())
|
||||
} else if is_pending(op._impl.over) {
|
||||
if is_pending(op._impl.over) {
|
||||
link_timeout(op, op.read.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
err := win.GetLastError()
|
||||
if is_incomplete(win.System_Error(err)) {
|
||||
link_timeout(op, op.read.expires)
|
||||
return .Pending
|
||||
}
|
||||
op.read.err = FS_Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1073,11 +1155,16 @@ write_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
&op._impl.over,
|
||||
) {
|
||||
assert(written == 0)
|
||||
if op._impl.over.Internal == nil {
|
||||
op.write.err = FS_Error(win.GetLastError())
|
||||
} else if is_pending(op._impl.over) {
|
||||
if is_pending(op._impl.over) {
|
||||
link_timeout(op, op.write.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
err := win.GetLastError()
|
||||
if is_incomplete(win.System_Error(err)) {
|
||||
link_timeout(op, op.write.expires)
|
||||
return .Pending
|
||||
}
|
||||
op.write.err = FS_Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1165,14 +1252,14 @@ recv_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
}
|
||||
|
||||
if status == win.SOCKET_ERROR {
|
||||
if op._impl.over.Internal == nil {
|
||||
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
||||
link_timeout(op, op.recv.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
switch _ in op.recv.socket {
|
||||
case TCP_Socket: op.recv.err = net._tcp_recv_error()
|
||||
case UDP_Socket: op.recv.err = net._udp_recv_error()
|
||||
}
|
||||
} else if is_pending(op._impl.over) {
|
||||
link_timeout(op, op.recv.expires)
|
||||
return .Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1286,14 +1373,14 @@ send_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
}
|
||||
|
||||
if status == win.SOCKET_ERROR {
|
||||
if op._impl.over.Internal == nil {
|
||||
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
||||
link_timeout(op, op.send.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
switch _ in op.send.socket {
|
||||
case TCP_Socket: op.send.err = net._tcp_send_error()
|
||||
case UDP_Socket: op.send.err = net._udp_send_error()
|
||||
}
|
||||
} else if is_pending(op._impl.over) {
|
||||
link_timeout(op, op.send.expires)
|
||||
return .Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1379,11 +1466,11 @@ sendfile_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
nil,
|
||||
0,
|
||||
) {
|
||||
if op._impl.over.Internal == nil {
|
||||
op.sendfile.err = net._tcp_send_error()
|
||||
} else if is_pending(op._impl.over) {
|
||||
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
||||
link_timeout(op, op.sendfile.expires)
|
||||
return .Pending
|
||||
} else if op._impl.over.Internal == nil {
|
||||
op.sendfile.err = net._tcp_send_error()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1481,7 +1568,7 @@ poll_exec :: proc(op: ^Operation) -> Op_Result {
|
||||
}
|
||||
|
||||
ok := win.PostQueuedCompletionStatus(
|
||||
op.l.iocp,
|
||||
g.iocp,
|
||||
0,
|
||||
0,
|
||||
&op._impl.over,
|
||||
@@ -1735,3 +1822,19 @@ load_socket_fn :: proc(subject: win.SOCKET, guid: win.GUID, fn: ^$T) {
|
||||
check_timed_out :: proc(op: ^Operation, expires: time.Time) -> bool {
|
||||
return expires != {} && time.diff(op.l.now, expires) <= 0
|
||||
}
|
||||
|
||||
timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering {
|
||||
switch {
|
||||
case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec:
|
||||
return .Less
|
||||
case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec:
|
||||
return .Greater
|
||||
case uintptr(a) < uintptr(b):
|
||||
return .Less
|
||||
case uintptr(a) > uintptr(b):
|
||||
return .Greater
|
||||
case:
|
||||
assert(a == b)
|
||||
return .Equal
|
||||
}
|
||||
}
|
||||
63
core/nbio/mpsc.odin
Normal file
63
core/nbio/mpsc.odin
Normal file
@@ -0,0 +1,63 @@
|
||||
#+private
|
||||
package nbio
|
||||
|
||||
import "base:runtime"
|
||||
|
||||
import "core:sync"
|
||||
|
||||
Multi_Producer_Single_Consumer :: struct {
|
||||
count: int,
|
||||
head: int,
|
||||
tail: int,
|
||||
buffer: []rawptr,
|
||||
mask: int,
|
||||
}
|
||||
|
||||
mpsc_init :: proc(mpscq: ^Multi_Producer_Single_Consumer, cap: int, allocator: runtime.Allocator) -> runtime.Allocator_Error {
|
||||
assert(runtime.is_power_of_two_int(cap), "cap must be a power of 2")
|
||||
mpscq.buffer = make([]rawptr, cap, allocator) or_return
|
||||
mpscq.mask = cap-1
|
||||
sync.atomic_thread_fence(.Release)
|
||||
return nil
|
||||
}
|
||||
|
||||
mpsc_destroy :: proc(mpscq: ^Multi_Producer_Single_Consumer, allocator: runtime.Allocator) {
|
||||
delete(mpscq.buffer, allocator)
|
||||
}
|
||||
|
||||
mpsc_enqueue :: proc(mpscq: ^Multi_Producer_Single_Consumer, obj: rawptr) -> bool {
|
||||
count := sync.atomic_add_explicit(&mpscq.count, 1, .Acquire)
|
||||
if count >= len(mpscq.buffer) {
|
||||
sync.atomic_sub_explicit(&mpscq.count, 1, .Release)
|
||||
return false
|
||||
}
|
||||
|
||||
head := sync.atomic_add_explicit(&mpscq.head, 1, .Acquire)
|
||||
assert(mpscq.buffer[head & mpscq.mask] == nil)
|
||||
rv := sync.atomic_exchange_explicit(&mpscq.buffer[head & mpscq.mask], obj, .Release)
|
||||
assert(rv == nil)
|
||||
return true
|
||||
}
|
||||
|
||||
mpsc_dequeue :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> rawptr {
|
||||
ret := sync.atomic_exchange_explicit(&mpscq.buffer[mpscq.tail], nil, .Acquire)
|
||||
if ret == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
mpscq.tail += 1
|
||||
if mpscq.tail >= len(mpscq.buffer) {
|
||||
mpscq.tail = 0
|
||||
}
|
||||
r := sync.atomic_sub_explicit(&mpscq.count, 1, .Release)
|
||||
assert(r > 0)
|
||||
return ret
|
||||
}
|
||||
|
||||
mpsc_count :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int {
|
||||
return sync.atomic_load_explicit(&mpscq.count, .Relaxed)
|
||||
}
|
||||
|
||||
mpsc_cap :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int {
|
||||
return len(mpscq.buffer)
|
||||
}
|
||||
@@ -1,11 +1,10 @@
|
||||
package nbio
|
||||
|
||||
import "base:intrinsics"
|
||||
import "base:runtime"
|
||||
|
||||
import "core:container/pool"
|
||||
import "core:container/queue"
|
||||
import "core:net"
|
||||
import "core:sync"
|
||||
import "core:time"
|
||||
|
||||
/*
|
||||
@@ -23,14 +22,13 @@ Do not copy.
|
||||
*/
|
||||
Event_Loop :: struct /* #no_copy */ {
|
||||
using impl: _Event_Loop,
|
||||
allocator: runtime.Allocator,
|
||||
err: General_Error,
|
||||
refs: int,
|
||||
now: time.Time,
|
||||
|
||||
// Queue that is used to queue operations from another thread to be executed on this thread.
|
||||
// TODO: Better data-structure.
|
||||
queue: queue.Queue(^Operation),
|
||||
queue_mu: sync.Mutex,
|
||||
queue: Multi_Producer_Single_Consumer,
|
||||
|
||||
operation_pool: pool.Pool(Operation),
|
||||
}
|
||||
@@ -408,13 +406,10 @@ exec :: proc(op: ^Operation, trigger_wake_up := true) {
|
||||
if op.l == &_tls_event_loop {
|
||||
_exec(op)
|
||||
} else {
|
||||
{
|
||||
// TODO: Better data-structure.
|
||||
sync.guard(&op.l.queue_mu)
|
||||
_, err := queue.push_back(&op.l.queue, op)
|
||||
if err != nil {
|
||||
panic("exec: queueing operation failed due to memory allocation failure")
|
||||
}
|
||||
for !mpsc_enqueue(&op.l.queue, op) {
|
||||
warn("operation queue on event loop filled up")
|
||||
wake_up(op.l)
|
||||
_yield()
|
||||
}
|
||||
if trigger_wake_up {
|
||||
wake_up(op.l)
|
||||
|
||||
@@ -32,6 +32,8 @@ EV_TXEMPTY :: DWORD(0x0004)
|
||||
|
||||
WAITORTIMERCALLBACK :: #type proc "system" (lpParameter: PVOID, TimerOrWaitFired: BOOLEAN)
|
||||
|
||||
PAPCFUNC :: #type proc "system" (Parameter: ULONG_PTR)
|
||||
|
||||
WT_EXECUTEDEFAULT :: 0x00000000
|
||||
WT_EXECUTEINIOTHREAD :: 0x00000001
|
||||
WT_EXECUTEINPERSISTENTTHREAD :: 0x00000080
|
||||
@@ -596,6 +598,8 @@ foreign kernel32 {
|
||||
) -> BOOL ---
|
||||
|
||||
UnregisterWaitEx :: proc(WaitHandle: HANDLE, CompletionEvent: HANDLE) -> BOOL ---
|
||||
|
||||
QueueUserAPC :: proc(pfnAPC: PAPCFUNC, hThread: HANDLE, dwData: ULONG_PTR) -> DWORD ---
|
||||
}
|
||||
|
||||
DEBUG_PROCESS :: 0x00000001
|
||||
|
||||
@@ -2726,6 +2726,8 @@ WAIT_OBJECT_0 : DWORD : 0x00000000
|
||||
WAIT_TIMEOUT : DWORD : 258
|
||||
WAIT_FAILED : DWORD : 0xFFFFFFFF
|
||||
|
||||
WAIT_IO_COMPLETION: DWORD : 0x000000C0
|
||||
|
||||
FILE_FLAG_WRITE_THROUGH : DWORD : 0x80000000
|
||||
FILE_FLAG_OVERLAPPED : DWORD : 0x40000000
|
||||
FILE_FLAG_NO_BUFFERING : DWORD : 0x20000000
|
||||
|
||||
Reference in New Issue
Block a user