diff --git a/core/nbio/impl.odin b/core/nbio/impl.odin index 3f5191c5e..476b8ab43 100644 --- a/core/nbio/impl.odin +++ b/core/nbio/impl.odin @@ -5,10 +5,8 @@ import "base:runtime" import "base:intrinsics" import "core:container/pool" -import "core:container/queue" import "core:net" import "core:strings" -import "core:sync" import "core:time" import "core:reflect" @@ -35,7 +33,13 @@ _acquire_thread_event_loop :: proc() -> General_Error { allocator := runtime.heap_allocator() } - l.queue.data.allocator = allocator + l.allocator = allocator + + if alloc_err := mpsc_init(&l.queue, 128, l.allocator); alloc_err != nil { + l.err = .Allocation_Failed + return l.err + } + defer if l.err != nil { mpsc_destroy(&l.queue, l.allocator) } if pool_err := pool.init(&l.operation_pool, "_pool_link"); pool_err != nil { l.err = .Allocation_Failed @@ -65,7 +69,7 @@ _release_thread_event_loop :: proc() { if l.refs > 0 { l.refs -= 1 if l.refs == 0 { - queue.destroy(&l.queue) + mpsc_destroy(&l.queue, l.allocator) pool.destroy(&l.operation_pool) _destroy(l) l^ = {} @@ -85,11 +89,10 @@ _current_thread_event_loop :: #force_inline proc(loc := #caller_location) -> (^E _tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { // Receive operations queued from other threads first. - { - sync.guard(&l.queue_mu) - for op in queue.pop_front_safe(&l.queue) { - _exec(op) - } + for { + op := (^Operation)(mpsc_dequeue(&l.queue)) + if op == nil { break } + _exec(op) } return __tick(l, timeout) diff --git a/core/nbio/impl_linux.odin b/core/nbio/impl_linux.odin index cdcbeedc1..47d47d77c 100644 --- a/core/nbio/impl_linux.odin +++ b/core/nbio/impl_linux.odin @@ -23,7 +23,6 @@ _Event_Loop :: struct { unqueued: queue.Queue(^Operation), // Ready to run callbacks, mainly next tick, some other ops that error outside the kernel. completed: queue.Queue(^Operation), - allocator: mem.Allocator, wake: ^Operation, } @@ -128,8 +127,6 @@ _Stat :: struct { @(private="package") _init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) { - l.allocator = alloc - params := uring.DEFAULT_PARAMS params.flags += {.SUBMIT_ALL, .COOP_TASKRUN, .SINGLE_ISSUER} @@ -472,6 +469,11 @@ _wake_up :: proc(l: ^Event_Loop) { assert(n == 8) } +@(private="package") +_yield :: proc() { + linux.sched_yield() +} + // Start file private. // The size of the IO Uring queues. diff --git a/core/nbio/impl_others.odin b/core/nbio/impl_others.odin index cac1f0c63..0a4564454 100644 --- a/core/nbio/impl_others.odin +++ b/core/nbio/impl_others.odin @@ -19,7 +19,6 @@ _FULLY_SUPPORTED :: false _Event_Loop :: struct { completed: queue.Queue(^Operation), timeouts: avl.Tree(^Operation), - allocator: mem.Allocator, } _Handle :: uintptr @@ -69,7 +68,6 @@ _Remove :: struct {} _Link_Timeout :: struct {} _init :: proc(l: ^Event_Loop, allocator: mem.Allocator) -> (rerr: General_Error) { - l.allocator = allocator l.completed.data.allocator = allocator avl.init_cmp(&l.timeouts, timeouts_cmp, allocator) @@ -215,3 +213,6 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err _wake_up :: proc(l: ^Event_Loop) { } + +_yield :: proc() { +} diff --git a/core/nbio/impl_posix.odin b/core/nbio/impl_posix.odin index e003f6ea3..9b4863710 100644 --- a/core/nbio/impl_posix.odin +++ b/core/nbio/impl_posix.odin @@ -23,7 +23,6 @@ _Event_Loop :: struct { // We have to keep record of what we currently have in the kqueue, and if we get an operation // that would be the same (ident, filter) pair we need to bundle the operations under one kevent. submitted: map[Queue_Identifier]^Operation, - allocator: mem.Allocator, // Holds all events we want to flush. Flushing is done each tick at which point this is emptied. pending: sa.Small_Array(QUEUE_SIZE, kq.KEvent), // Holds what should be in `pending` but didn't fit. @@ -107,7 +106,6 @@ _Link_Timeout :: struct {} @(private="package") _init :: proc(l: ^Event_Loop, allocator: mem.Allocator) -> (rerr: General_Error) { - l.allocator = allocator l.submitted.allocator = allocator l.overflow.data.allocator = allocator l.completed.data.allocator = allocator @@ -510,6 +508,7 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err @(private="package") _wake_up :: proc(l: ^Event_Loop) { + // TODO: only if we are sleeping (like Windows). ev := [1]kq.KEvent{ { ident = IDENT_WAKE_UP, @@ -526,6 +525,11 @@ _wake_up :: proc(l: ^Event_Loop) { assert(n == 0) } +@(private="package") +_yield :: proc() { + posix.sched_yield() +} + // Start file private. // Max operations that can be enqueued per tick. diff --git a/core/nbio/impl_windows.odin b/core/nbio/impl_windows.odin index 2ff391a23..af4518cb7 100644 --- a/core/nbio/impl_windows.odin +++ b/core/nbio/impl_windows.odin @@ -8,10 +8,11 @@ import "core:container/pool" import "core:container/queue" import "core:mem" import "core:net" +import "core:path/filepath" import "core:slice" import "core:strings" +import "core:sync" import "core:time" -import "core:path/filepath" import win "core:sys/windows" @@ -20,10 +21,15 @@ _FULLY_SUPPORTED :: true @(private="package") _Event_Loop :: struct { - iocp: win.HANDLE, - allocator: mem.Allocator, - timeouts: avl.Tree(^Operation), - completed: queue.Queue(^Operation), + timeouts: avl.Tree(^Operation), + thread: win.HANDLE, + completed: queue.Queue(^Operation), + completed_oob: Multi_Producer_Single_Consumer, + state: enum { + Working, + Waking, + Sleeping, + }, } @(private="package") @@ -107,47 +113,32 @@ _Stat :: struct {} _init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) { l.allocator = alloc - mem_err: mem.Allocator_Error - if mem_err = queue.init(&l.completed, allocator = alloc); mem_err != nil { - err = .Allocation_Failed - return - } - defer if err != nil { queue.destroy(&l.completed) } + l.completed.data.allocator = l.allocator avl.init(&l.timeouts, timeouts_cmp, alloc) - win.ensure_winsock_initialized() + mpsc_init(&l.completed_oob, QUEUE_SIZE, l.allocator) + defer if err != nil { mpsc_destroy(&l.completed_oob, l.allocator) } - l.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 1) - if l.iocp == nil { - err = General_Error(win.GetLastError()) - return - } + dup_ok := win.DuplicateHandle( + win.GetCurrentProcess(), win.GetCurrentThread(), + win.GetCurrentProcess(), &l.thread, + 0, false, win.DUPLICATE_SAME_ACCESS, + ) + ensure(dup_ok == true) + defer if err != nil { win.CloseHandle(l.thread) } + err = g_ref() return - - timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering { - switch { - case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec: - return .Less - case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec: - return .Greater - case uintptr(a) < uintptr(b): - return .Less - case uintptr(a) > uintptr(b): - return .Greater - case: - assert(a == b) - return .Equal - } - } } @(private="package") _destroy :: proc(l: ^Event_Loop) { - queue.destroy(&l.completed) avl.destroy(&l.timeouts) - win.CloseHandle(l.iocp) + queue.destroy(&l.completed) + mpsc_destroy(&l.completed_oob, l.allocator) + win.CloseHandle(l.thread) + g_unref() } @(private="package") @@ -166,10 +157,16 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { } } + for { + op := (^Operation)(mpsc_dequeue(&l.completed_oob)) + if op == nil { break } + handle_completed(op) + } + if pool.num_outstanding(&l.operation_pool) == 0 { return nil } actual_timeout := win.INFINITE - if queue.len(l.completed) > 0 { + if queue.len(l.completed) > 0 || mpsc_count(&l.completed_oob) > 0 { actual_timeout = 0 } else if timeout >= 0 { actual_timeout = win.DWORD(timeout / time.Millisecond) @@ -178,14 +175,33 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { actual_timeout = min(actual_timeout, win.DWORD(nt / time.Millisecond)) } + if actual_timeout > 0 { + sync.atomic_store_explicit(&l.state, .Sleeping, .Release) + + // There could be a race condition where we go sleeping at the same time as things get queued + // and a wakeup isn't done because the state is not .Sleeping yet. + // So after sleeping we first check our queues. + + for { + op := (^Operation)(mpsc_dequeue(&l.queue)) + if op == nil { break } + _exec(op) + } + + for { + op := (^Operation)(mpsc_dequeue(&l.completed_oob)) + if op == nil { break } + handle_completed(op) + } + } + for { - QUEUE_SIZE :: 256 - events: [QUEUE_SIZE]win.OVERLAPPED_ENTRY + events: [256]win.OVERLAPPED_ENTRY entries_removed: win.ULONG - if !win.GetQueuedCompletionStatusEx(l.iocp, &events[0], len(events), &entries_removed, actual_timeout, false) { + if !win.GetQueuedCompletionStatusEx(g.iocp, &events[0], len(events), &entries_removed, actual_timeout, true) { winerr := win.GetLastError() switch winerr { - case win.WAIT_TIMEOUT: + case win.WAIT_TIMEOUT, win.WAIT_IO_COMPLETION: entries_removed = 0 case: err = General_Error(winerr) @@ -193,6 +209,8 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { } } + sync.atomic_store_explicit(&l.state, .Working, .Relaxed) + if actual_timeout > 0 { // We may have just waited some time, lets update the current time. l.now = time.now() @@ -203,13 +221,23 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { } for event in events[:entries_removed] { - if event.lpCompletionKey == COMPLETION_KEY_WAKE_UP { continue } assert(event.lpOverlapped != nil) op := container_of(container_of(event.lpOverlapped, _Operation, "over"), Operation, "_impl") - handle_completed(op) + + if op.l == l { + handle_completed(op) + } else { + op_l := op.l + for !mpsc_enqueue(&op.l.completed_oob, op) { + warn("oob queue filled up, QUEUE_SIZE may need increasing") + _wake_up(op_l) + win.SwitchToThread() + } + _wake_up(op_l) + } } - if entries_removed < QUEUE_SIZE { + if entries_removed < len(events) { break } @@ -249,11 +277,20 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) { continue } - expires = cexpires - debug("first timeout in the future is at", op.timeout._impl.expires, "after", cexpires) - return + break } + // Don't merge this with the previous iteration because the `handle_completed` in that one might queue + // more timeouts which we want to detect here. + // For example: `timeout(time.Second, proc(_: ^Operation) { timeout(time.Second, ...) })` + + first := avl.first(&l.timeouts) + if first != nil { + op := first.value + cexpires := time.diff(curr, op.timeout._impl.expires) + debug("first timeout in the future is at", op.timeout._impl.expires, "after", cexpires) + expires = cexpires + } return } @@ -649,7 +686,7 @@ _remove :: proc(target: ^Operation) { target.poll._impl.wait_handle = nil ok := win.PostQueuedCompletionStatus( - target.l.iocp, + g.iocp, 0, 0, &target._impl.over, @@ -702,8 +739,8 @@ _remove :: proc(target: ^Operation) { @(private="package") _associate_handle :: proc(handle: uintptr, l: ^Event_Loop) -> (Handle, Association_Error) { - handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), l.iocp, 0, 0) - if handle_iocp != l.iocp { + handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), g.iocp, 0, 0) + if handle_iocp != g.iocp { return INVALID_HANDLE, .Not_Possible_To_Associate } @@ -737,27 +774,69 @@ _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Err @(private="package") _wake_up :: proc(l: ^Event_Loop) { - win.PostQueuedCompletionStatus( - l.iocp, - 0, - COMPLETION_KEY_WAKE_UP, - nil, - ) + _, exchanged := sync.atomic_compare_exchange_strong(&l.state, .Sleeping, .Waking) + if exchanged { + win.QueueUserAPC(proc "system" (Parameter: win.ULONG_PTR) {}, l.thread, 0) + } +} + +@(private="package") +_yield :: proc() { + win.SwitchToThread() } // Start file private. +QUEUE_SIZE :: 128 + REMOVED :: rawptr(max(uintptr)-1) INVALID_HANDLE :: Handle(win.INVALID_HANDLE) -COMPLETION_KEY_WAKE_UP :: 69 - Op_Result :: enum { Done, Pending, } +/* +IOCP is designed to be used from multiple threads. +For best performance we need to adhere to that and have one single IOCP for the event loops to share. +*/ +g: struct{ + mu: sync.Mutex, + refs: int, + iocp: win.HANDLE, + err: General_Error, +} + +g_ref :: proc() -> General_Error { + sync.guard(&g.mu) + + if g.refs == 0 { + win.ensure_winsock_initialized() + + // NOTE: setting NumberOfConcurrentThreads to 0 which makes Windows use the amount of processors as a default. + // We may want to make this configurable somehow? + g.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 0) + if g.iocp == nil { + g.err = General_Error(win.GetLastError()) + } + } + + sync.atomic_add(&g.refs, 1) + + return sync.atomic_load(&g.err) +} + +g_unref :: proc() { + sync.guard(&g.mu) + + if sync.atomic_sub(&g.refs, 1) == 1 { + win.CloseHandle(g.iocp) + g.err = nil + } +} + operation_handle :: proc(op: ^Operation) -> win.HANDLE { switch op.type { case .Accept: return win.HANDLE(uintptr(op.accept.socket)) @@ -845,11 +924,11 @@ accept_exec :: proc(op: ^Operation) -> Op_Result { &received, &op._impl.over, ) { - if op._impl.over.Internal == nil { - op.accept.err = net._accept_error() - } else if is_pending(op._impl.over) { + if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) { link_timeout(op, op.accept.expires) return .Pending + } else if op._impl.over.Internal == nil { + op.accept.err = net._accept_error() } } @@ -947,14 +1026,12 @@ dial_exec :: proc(op: ^Operation) -> (result: Op_Result) { &transferred, &op._impl.over, ) { - if op._impl.over.Internal == nil { - op.dial.err = net._dial_error() - } else if is_pending(op._impl.over) { + if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) { link_timeout(op, op.dial.expires) return .Pending + } else if op._impl.over.Internal == nil { + op.dial.err = net._dial_error() } - - return .Done } return .Done @@ -1004,11 +1081,16 @@ read_exec :: proc(op: ^Operation) -> Op_Result { &op._impl.over, ) { assert(read == 0) - if op._impl.over.Internal == nil { - op.read.err = FS_Error(win.GetLastError()) - } else if is_pending(op._impl.over) { + if is_pending(op._impl.over) { link_timeout(op, op.read.expires) return .Pending + } else if op._impl.over.Internal == nil { + err := win.GetLastError() + if is_incomplete(win.System_Error(err)) { + link_timeout(op, op.read.expires) + return .Pending + } + op.read.err = FS_Error(err) } } @@ -1073,11 +1155,16 @@ write_exec :: proc(op: ^Operation) -> Op_Result { &op._impl.over, ) { assert(written == 0) - if op._impl.over.Internal == nil { - op.write.err = FS_Error(win.GetLastError()) - } else if is_pending(op._impl.over) { + if is_pending(op._impl.over) { link_timeout(op, op.write.expires) return .Pending + } else if op._impl.over.Internal == nil { + err := win.GetLastError() + if is_incomplete(win.System_Error(err)) { + link_timeout(op, op.write.expires) + return .Pending + } + op.write.err = FS_Error(err) } } @@ -1165,14 +1252,14 @@ recv_exec :: proc(op: ^Operation) -> Op_Result { } if status == win.SOCKET_ERROR { - if op._impl.over.Internal == nil { + if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) { + link_timeout(op, op.recv.expires) + return .Pending + } else if op._impl.over.Internal == nil { switch _ in op.recv.socket { case TCP_Socket: op.recv.err = net._tcp_recv_error() case UDP_Socket: op.recv.err = net._udp_recv_error() } - } else if is_pending(op._impl.over) { - link_timeout(op, op.recv.expires) - return .Pending } } @@ -1286,14 +1373,14 @@ send_exec :: proc(op: ^Operation) -> Op_Result { } if status == win.SOCKET_ERROR { - if op._impl.over.Internal == nil { + if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) { + link_timeout(op, op.send.expires) + return .Pending + } else if op._impl.over.Internal == nil { switch _ in op.send.socket { case TCP_Socket: op.send.err = net._tcp_send_error() case UDP_Socket: op.send.err = net._udp_send_error() } - } else if is_pending(op._impl.over) { - link_timeout(op, op.send.expires) - return .Pending } } @@ -1379,11 +1466,11 @@ sendfile_exec :: proc(op: ^Operation) -> Op_Result { nil, 0, ) { - if op._impl.over.Internal == nil { - op.sendfile.err = net._tcp_send_error() - } else if is_pending(op._impl.over) { + if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) { link_timeout(op, op.sendfile.expires) return .Pending + } else if op._impl.over.Internal == nil { + op.sendfile.err = net._tcp_send_error() } } @@ -1481,7 +1568,7 @@ poll_exec :: proc(op: ^Operation) -> Op_Result { } ok := win.PostQueuedCompletionStatus( - op.l.iocp, + g.iocp, 0, 0, &op._impl.over, @@ -1735,3 +1822,19 @@ load_socket_fn :: proc(subject: win.SOCKET, guid: win.GUID, fn: ^$T) { check_timed_out :: proc(op: ^Operation, expires: time.Time) -> bool { return expires != {} && time.diff(op.l.now, expires) <= 0 } + +timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering { + switch { + case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec: + return .Less + case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec: + return .Greater + case uintptr(a) < uintptr(b): + return .Less + case uintptr(a) > uintptr(b): + return .Greater + case: + assert(a == b) + return .Equal + } +} \ No newline at end of file diff --git a/core/nbio/mpsc.odin b/core/nbio/mpsc.odin new file mode 100644 index 000000000..7f88829b4 --- /dev/null +++ b/core/nbio/mpsc.odin @@ -0,0 +1,63 @@ +#+private +package nbio + +import "base:runtime" + +import "core:sync" + +Multi_Producer_Single_Consumer :: struct { + count: int, + head: int, + tail: int, + buffer: []rawptr, + mask: int, +} + +mpsc_init :: proc(mpscq: ^Multi_Producer_Single_Consumer, cap: int, allocator: runtime.Allocator) -> runtime.Allocator_Error { + assert(runtime.is_power_of_two_int(cap), "cap must be a power of 2") + mpscq.buffer = make([]rawptr, cap, allocator) or_return + mpscq.mask = cap-1 + sync.atomic_thread_fence(.Release) + return nil +} + +mpsc_destroy :: proc(mpscq: ^Multi_Producer_Single_Consumer, allocator: runtime.Allocator) { + delete(mpscq.buffer, allocator) +} + +mpsc_enqueue :: proc(mpscq: ^Multi_Producer_Single_Consumer, obj: rawptr) -> bool { + count := sync.atomic_add_explicit(&mpscq.count, 1, .Acquire) + if count >= len(mpscq.buffer) { + sync.atomic_sub_explicit(&mpscq.count, 1, .Release) + return false + } + + head := sync.atomic_add_explicit(&mpscq.head, 1, .Acquire) + assert(mpscq.buffer[head & mpscq.mask] == nil) + rv := sync.atomic_exchange_explicit(&mpscq.buffer[head & mpscq.mask], obj, .Release) + assert(rv == nil) + return true +} + +mpsc_dequeue :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> rawptr { + ret := sync.atomic_exchange_explicit(&mpscq.buffer[mpscq.tail], nil, .Acquire) + if ret == nil { + return nil + } + + mpscq.tail += 1 + if mpscq.tail >= len(mpscq.buffer) { + mpscq.tail = 0 + } + r := sync.atomic_sub_explicit(&mpscq.count, 1, .Release) + assert(r > 0) + return ret +} + +mpsc_count :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int { + return sync.atomic_load_explicit(&mpscq.count, .Relaxed) +} + +mpsc_cap :: proc(mpscq: ^Multi_Producer_Single_Consumer) -> int { + return len(mpscq.buffer) +} \ No newline at end of file diff --git a/core/nbio/nbio.odin b/core/nbio/nbio.odin index 274cc5291..703a2b4d7 100644 --- a/core/nbio/nbio.odin +++ b/core/nbio/nbio.odin @@ -1,11 +1,10 @@ package nbio import "base:intrinsics" +import "base:runtime" import "core:container/pool" -import "core:container/queue" import "core:net" -import "core:sync" import "core:time" /* @@ -23,14 +22,13 @@ Do not copy. */ Event_Loop :: struct /* #no_copy */ { using impl: _Event_Loop, + allocator: runtime.Allocator, err: General_Error, refs: int, now: time.Time, // Queue that is used to queue operations from another thread to be executed on this thread. - // TODO: Better data-structure. - queue: queue.Queue(^Operation), - queue_mu: sync.Mutex, + queue: Multi_Producer_Single_Consumer, operation_pool: pool.Pool(Operation), } @@ -408,13 +406,10 @@ exec :: proc(op: ^Operation, trigger_wake_up := true) { if op.l == &_tls_event_loop { _exec(op) } else { - { - // TODO: Better data-structure. - sync.guard(&op.l.queue_mu) - _, err := queue.push_back(&op.l.queue, op) - if err != nil { - panic("exec: queueing operation failed due to memory allocation failure") - } + for !mpsc_enqueue(&op.l.queue, op) { + warn("operation queue on event loop filled up") + wake_up(op.l) + _yield() } if trigger_wake_up { wake_up(op.l) diff --git a/core/sys/windows/kernel32.odin b/core/sys/windows/kernel32.odin index dc76cb037..07f34bed2 100644 --- a/core/sys/windows/kernel32.odin +++ b/core/sys/windows/kernel32.odin @@ -32,6 +32,8 @@ EV_TXEMPTY :: DWORD(0x0004) WAITORTIMERCALLBACK :: #type proc "system" (lpParameter: PVOID, TimerOrWaitFired: BOOLEAN) +PAPCFUNC :: #type proc "system" (Parameter: ULONG_PTR) + WT_EXECUTEDEFAULT :: 0x00000000 WT_EXECUTEINIOTHREAD :: 0x00000001 WT_EXECUTEINPERSISTENTTHREAD :: 0x00000080 @@ -596,6 +598,8 @@ foreign kernel32 { ) -> BOOL --- UnregisterWaitEx :: proc(WaitHandle: HANDLE, CompletionEvent: HANDLE) -> BOOL --- + + QueueUserAPC :: proc(pfnAPC: PAPCFUNC, hThread: HANDLE, dwData: ULONG_PTR) -> DWORD --- } DEBUG_PROCESS :: 0x00000001 diff --git a/core/sys/windows/types.odin b/core/sys/windows/types.odin index 0d8854724..f3f581844 100644 --- a/core/sys/windows/types.odin +++ b/core/sys/windows/types.odin @@ -2726,6 +2726,8 @@ WAIT_OBJECT_0 : DWORD : 0x00000000 WAIT_TIMEOUT : DWORD : 258 WAIT_FAILED : DWORD : 0xFFFFFFFF +WAIT_IO_COMPLETION: DWORD : 0x000000C0 + FILE_FLAG_WRITE_THROUGH : DWORD : 0x80000000 FILE_FLAG_OVERLAPPED : DWORD : 0x40000000 FILE_FLAG_NO_BUFFERING : DWORD : 0x20000000