nbio(windows): use one IOCP for all event loops

This commit is contained in:
laytan
2026-01-17 21:06:13 +01:00
parent 3a3ed48fae
commit 2caee687ed
4 changed files with 169 additions and 74 deletions

View File

@@ -510,6 +510,7 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err
@(private="package")
_wake_up :: proc(l: ^Event_Loop) {
// TODO: only if we are sleeping (like Windows).
ev := [1]kq.KEvent{
{
ident = IDENT_WAKE_UP,

View File

@@ -8,10 +8,11 @@ import "core:container/pool"
import "core:container/queue"
import "core:mem"
import "core:net"
import "core:path/filepath"
import "core:slice"
import "core:strings"
import "core:sync"
import "core:time"
import "core:path/filepath"
import win "core:sys/windows"
@@ -20,10 +21,15 @@ _FULLY_SUPPORTED :: true
@(private="package")
_Event_Loop :: struct {
iocp: win.HANDLE,
allocator: mem.Allocator,
timeouts: avl.Tree(^Operation),
completed: queue.Queue(^Operation),
timeouts: avl.Tree(^Operation),
thread: win.HANDLE,
completed: queue.Queue(^Operation),
completed_oob: Multi_Producer_Single_Consumer,
state: enum {
Working,
Waking,
Sleeping,
},
}
@(private="package")
@@ -111,34 +117,19 @@ _init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) {
avl.init(&l.timeouts, timeouts_cmp, alloc)
win.ensure_winsock_initialized()
mpsc_init(&l.completed_oob, QUEUE_SIZE, l.allocator)
defer if err != nil { mpsc_destroy(&l.completed_oob, l.allocator) }
l.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 1)
if l.iocp == nil {
err = General_Error(win.GetLastError())
return
}
dup_ok := win.DuplicateHandle(
win.GetCurrentProcess(), win.GetCurrentThread(),
win.GetCurrentProcess(), &l.thread,
0, false, win.DUPLICATE_SAME_ACCESS,
)
ensure(dup_ok == true)
defer if err != nil { win.CloseHandle(l.thread) }
err = g_ref()
return
timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering {
switch {
case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec:
return .Less
case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec:
return .Greater
case uintptr(a) < uintptr(b):
return .Less
case uintptr(a) > uintptr(b):
return .Greater
case:
assert(a == b)
return .Equal
}
}
}
@(private="package")
@@ -146,7 +137,8 @@ _destroy :: proc(l: ^Event_Loop) {
avl.destroy(&l.timeouts)
queue.destroy(&l.completed)
mpsc_destroy(&l.completed_oob, l.allocator)
win.CloseHandle(l.iocp)
win.CloseHandle(l.thread)
g_unref()
}
@(private="package")
@@ -165,10 +157,16 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
}
}
for {
op := (^Operation)(mpsc_dequeue(&l.completed_oob))
if op == nil { break }
handle_completed(op)
}
if pool.num_outstanding(&l.operation_pool) == 0 { return nil }
actual_timeout := win.INFINITE
if queue.len(l.completed) > 0 {
if queue.len(l.completed) > 0 || mpsc_count(&l.completed_oob) > 0 {
actual_timeout = 0
} else if timeout >= 0 {
actual_timeout = win.DWORD(timeout / time.Millisecond)
@@ -177,14 +175,33 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
actual_timeout = min(actual_timeout, win.DWORD(nt / time.Millisecond))
}
if actual_timeout > 0 {
sync.atomic_store_explicit(&l.state, .Sleeping, .Release)
// There could be a race condition where we go sleeping at the same time as things get queued
// and a wakeup isn't done because the state is not .Sleeping yet.
// So after sleeping we first check our queues.
for {
op := (^Operation)(mpsc_dequeue(&l.queue))
if op == nil { break }
_exec(op)
}
for {
op := (^Operation)(mpsc_dequeue(&l.completed_oob))
if op == nil { break }
handle_completed(op)
}
}
for {
QUEUE_SIZE :: 256
events: [QUEUE_SIZE]win.OVERLAPPED_ENTRY
events: [256]win.OVERLAPPED_ENTRY
entries_removed: win.ULONG
if !win.GetQueuedCompletionStatusEx(l.iocp, &events[0], len(events), &entries_removed, actual_timeout, false) {
if !win.GetQueuedCompletionStatusEx(g.iocp, &events[0], len(events), &entries_removed, actual_timeout, true) {
winerr := win.GetLastError()
switch winerr {
case win.WAIT_TIMEOUT:
case win.WAIT_TIMEOUT, win.WAIT_IO_COMPLETION:
entries_removed = 0
case:
err = General_Error(winerr)
@@ -192,6 +209,8 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
}
}
sync.atomic_store_explicit(&l.state, .Working, .Relaxed)
if actual_timeout > 0 {
// We may have just waited some time, lets update the current time.
l.now = time.now()
@@ -202,13 +221,23 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
}
for event in events[:entries_removed] {
if event.lpCompletionKey == COMPLETION_KEY_WAKE_UP { continue }
assert(event.lpOverlapped != nil)
op := container_of(container_of(event.lpOverlapped, _Operation, "over"), Operation, "_impl")
handle_completed(op)
if op.l == l {
handle_completed(op)
} else {
op_l := op.l
for !mpsc_enqueue(&op.l.completed_oob, op) {
warn("oob queue filled up, QUEUE_SIZE may need increasing")
_wake_up(op_l)
win.SwitchToThread()
}
_wake_up(op_l)
}
}
if entries_removed < QUEUE_SIZE {
if entries_removed < len(events) {
break
}
@@ -657,7 +686,7 @@ _remove :: proc(target: ^Operation) {
target.poll._impl.wait_handle = nil
ok := win.PostQueuedCompletionStatus(
target.l.iocp,
g.iocp,
0,
0,
&target._impl.over,
@@ -710,8 +739,8 @@ _remove :: proc(target: ^Operation) {
@(private="package")
_associate_handle :: proc(handle: uintptr, l: ^Event_Loop) -> (Handle, Association_Error) {
handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), l.iocp, 0, 0)
if handle_iocp != l.iocp {
handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), g.iocp, 0, 0)
if handle_iocp != g.iocp {
return INVALID_HANDLE, .Not_Possible_To_Associate
}
@@ -745,12 +774,10 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err
@(private="package")
_wake_up :: proc(l: ^Event_Loop) {
win.PostQueuedCompletionStatus(
l.iocp,
0,
COMPLETION_KEY_WAKE_UP,
nil,
)
_, exchanged := sync.atomic_compare_exchange_strong(&l.state, .Sleeping, .Waking)
if exchanged {
win.QueueUserAPC(proc "system" (Parameter: win.ULONG_PTR) {}, l.thread, 0)
}
}
@(private="package")
@@ -766,13 +793,50 @@ REMOVED :: rawptr(max(uintptr)-1)
INVALID_HANDLE :: Handle(win.INVALID_HANDLE)
COMPLETION_KEY_WAKE_UP :: 69
Op_Result :: enum {
Done,
Pending,
}
/*
IOCP is designed to be used from multiple threads.
For best performance we need to adhere to that and have one single IOCP for the event loops to share.
*/
g: struct{
mu: sync.Mutex,
refs: int,
iocp: win.HANDLE,
err: General_Error,
}
g_ref :: proc() -> General_Error {
sync.guard(&g.mu)
if g.refs == 0 {
win.ensure_winsock_initialized()
// NOTE: setting NumberOfConcurrentThreads to 0 which makes Windows use the amount of processors as a default.
// We may want to make this configurable somehow?
g.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 0)
if g.iocp == nil {
g.err = General_Error(win.GetLastError())
}
}
sync.atomic_add(&g.refs, 1)
return sync.atomic_load(&g.err)
}
g_unref :: proc() {
sync.guard(&g.mu)
if sync.atomic_sub(&g.refs, 1) == 1 {
win.CloseHandle(g.iocp)
g.err = nil
}
}
operation_handle :: proc(op: ^Operation) -> win.HANDLE {
switch op.type {
case .Accept: return win.HANDLE(uintptr(op.accept.socket))
@@ -860,11 +924,11 @@ accept_exec :: proc(op: ^Operation) -> Op_Result {
&received,
&op._impl.over,
) {
if op._impl.over.Internal == nil {
op.accept.err = net._accept_error()
} else if is_pending(op._impl.over) {
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
link_timeout(op, op.accept.expires)
return .Pending
} else if op._impl.over.Internal == nil {
op.accept.err = net._accept_error()
}
}
@@ -962,14 +1026,12 @@ dial_exec :: proc(op: ^Operation) -> (result: Op_Result) {
&transferred,
&op._impl.over,
) {
if op._impl.over.Internal == nil {
op.dial.err = net._dial_error()
} else if is_pending(op._impl.over) {
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
link_timeout(op, op.dial.expires)
return .Pending
} else if op._impl.over.Internal == nil {
op.dial.err = net._dial_error()
}
return .Done
}
return .Done
@@ -1019,11 +1081,16 @@ read_exec :: proc(op: ^Operation) -> Op_Result {
&op._impl.over,
) {
assert(read == 0)
if op._impl.over.Internal == nil {
op.read.err = FS_Error(win.GetLastError())
} else if is_pending(op._impl.over) {
if is_pending(op._impl.over) {
link_timeout(op, op.read.expires)
return .Pending
} else if op._impl.over.Internal == nil {
err := win.GetLastError()
if is_incomplete(win.System_Error(err)) {
link_timeout(op, op.read.expires)
return .Pending
}
op.read.err = FS_Error(err)
}
}
@@ -1088,11 +1155,16 @@ write_exec :: proc(op: ^Operation) -> Op_Result {
&op._impl.over,
) {
assert(written == 0)
if op._impl.over.Internal == nil {
op.write.err = FS_Error(win.GetLastError())
} else if is_pending(op._impl.over) {
if is_pending(op._impl.over) {
link_timeout(op, op.write.expires)
return .Pending
} else if op._impl.over.Internal == nil {
err := win.GetLastError()
if is_incomplete(win.System_Error(err)) {
link_timeout(op, op.write.expires)
return .Pending
}
op.write.err = FS_Error(err)
}
}
@@ -1180,14 +1252,14 @@ recv_exec :: proc(op: ^Operation) -> Op_Result {
}
if status == win.SOCKET_ERROR {
if op._impl.over.Internal == nil {
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
link_timeout(op, op.recv.expires)
return .Pending
} else if op._impl.over.Internal == nil {
switch _ in op.recv.socket {
case TCP_Socket: op.recv.err = net._tcp_recv_error()
case UDP_Socket: op.recv.err = net._udp_recv_error()
}
} else if is_pending(op._impl.over) {
link_timeout(op, op.recv.expires)
return .Pending
}
}
@@ -1301,14 +1373,14 @@ send_exec :: proc(op: ^Operation) -> Op_Result {
}
if status == win.SOCKET_ERROR {
if op._impl.over.Internal == nil {
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
link_timeout(op, op.send.expires)
return .Pending
} else if op._impl.over.Internal == nil {
switch _ in op.send.socket {
case TCP_Socket: op.send.err = net._tcp_send_error()
case UDP_Socket: op.send.err = net._udp_send_error()
}
} else if is_pending(op._impl.over) {
link_timeout(op, op.send.expires)
return .Pending
}
}
@@ -1394,11 +1466,11 @@ sendfile_exec :: proc(op: ^Operation) -> Op_Result {
nil,
0,
) {
if op._impl.over.Internal == nil {
op.sendfile.err = net._tcp_send_error()
} else if is_pending(op._impl.over) {
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
link_timeout(op, op.sendfile.expires)
return .Pending
} else if op._impl.over.Internal == nil {
op.sendfile.err = net._tcp_send_error()
}
}
@@ -1496,7 +1568,7 @@ poll_exec :: proc(op: ^Operation) -> Op_Result {
}
ok := win.PostQueuedCompletionStatus(
op.l.iocp,
g.iocp,
0,
0,
&op._impl.over,
@@ -1750,3 +1822,19 @@ load_socket_fn :: proc(subject: win.SOCKET, guid: win.GUID, fn: ^$T) {
check_timed_out :: proc(op: ^Operation, expires: time.Time) -> bool {
return expires != {} && time.diff(op.l.now, expires) <= 0
}
timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering {
switch {
case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec:
return .Less
case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec:
return .Greater
case uintptr(a) < uintptr(b):
return .Less
case uintptr(a) > uintptr(b):
return .Greater
case:
assert(a == b)
return .Equal
}
}

View File

@@ -32,6 +32,8 @@ EV_TXEMPTY :: DWORD(0x0004)
WAITORTIMERCALLBACK :: #type proc "system" (lpParameter: PVOID, TimerOrWaitFired: BOOLEAN)
PAPCFUNC :: #type proc "system" (Parameter: ULONG_PTR)
WT_EXECUTEDEFAULT :: 0x00000000
WT_EXECUTEINIOTHREAD :: 0x00000001
WT_EXECUTEINPERSISTENTTHREAD :: 0x00000080
@@ -596,6 +598,8 @@ foreign kernel32 {
) -> BOOL ---
UnregisterWaitEx :: proc(WaitHandle: HANDLE, CompletionEvent: HANDLE) -> BOOL ---
QueueUserAPC :: proc(pfnAPC: PAPCFUNC, hThread: HANDLE, dwData: ULONG_PTR) -> DWORD ---
}
DEBUG_PROCESS :: 0x00000001

View File

@@ -2726,6 +2726,8 @@ WAIT_OBJECT_0 : DWORD : 0x00000000
WAIT_TIMEOUT : DWORD : 258
WAIT_FAILED : DWORD : 0xFFFFFFFF
WAIT_IO_COMPLETION: DWORD : 0x000000C0
FILE_FLAG_WRITE_THROUGH : DWORD : 0x80000000
FILE_FLAG_OVERLAPPED : DWORD : 0x40000000
FILE_FLAG_NO_BUFFERING : DWORD : 0x20000000