mirror of
https://github.com/odin-lang/Odin.git
synced 2026-02-14 15:23:15 +00:00
1849 lines
44 KiB
Odin
1849 lines
44 KiB
Odin
#+private file
|
|
package nbio
|
|
|
|
import "base:intrinsics"
|
|
|
|
import "core:container/avl"
|
|
import "core:container/pool"
|
|
import "core:container/queue"
|
|
import "core:mem"
|
|
import "core:net"
|
|
import "core:path/filepath"
|
|
import "core:slice"
|
|
import "core:strings"
|
|
import "core:sync"
|
|
import "core:time"
|
|
|
|
import win "core:sys/windows"
|
|
|
|
@(private="package")
|
|
_FULLY_SUPPORTED :: true
|
|
|
|
@(private="package")
|
|
_Event_Loop :: struct {
|
|
timeouts: avl.Tree(^Operation),
|
|
thread: win.HANDLE,
|
|
completed: queue.Queue(^Operation),
|
|
completed_oob: Multi_Producer_Single_Consumer,
|
|
state: enum {
|
|
Working,
|
|
Waking,
|
|
Sleeping,
|
|
},
|
|
}
|
|
|
|
@(private="package")
|
|
_Handle :: distinct uintptr
|
|
|
|
@(private="package")
|
|
_CWD :: ~_Handle(99)
|
|
|
|
@(private="package")
|
|
MAX_RW :: mem.Gigabyte
|
|
|
|
@(private="package")
|
|
_Operation :: struct {
|
|
over: win.OVERLAPPED,
|
|
timeout: ^Operation,
|
|
}
|
|
|
|
@(private="package")
|
|
_Accept :: struct {
|
|
// Space that gets the local and remote address written into it.
|
|
addrs: [(size_of(win.sockaddr_in6)+16)*2]byte,
|
|
}
|
|
|
|
@(private="package")
|
|
_Close :: struct {}
|
|
|
|
@(private="package")
|
|
_Dial :: struct {
|
|
addr: win.SOCKADDR_STORAGE_LH,
|
|
}
|
|
|
|
@(private="package")
|
|
_Read :: struct {}
|
|
|
|
@(private="package")
|
|
_Write :: struct {}
|
|
|
|
@(private="package")
|
|
_Send :: struct {
|
|
bufs: Bufs,
|
|
}
|
|
|
|
@(private="package")
|
|
_Recv :: struct {
|
|
source: win.SOCKADDR_STORAGE_LH,
|
|
source_len: win.INT,
|
|
flags: win.DWORD,
|
|
bufs: Bufs,
|
|
}
|
|
|
|
@(private="package")
|
|
_Timeout :: struct {
|
|
expires: time.Time,
|
|
target: ^Operation,
|
|
}
|
|
|
|
@(private="package")
|
|
_Poll :: struct {
|
|
wait_handle: win.HANDLE,
|
|
}
|
|
|
|
@(private="package")
|
|
_Send_File :: struct {}
|
|
|
|
@(private="package")
|
|
_Remove :: struct {}
|
|
|
|
@(private="package")
|
|
_Link_Timeout :: struct {}
|
|
|
|
@(private="package")
|
|
_Splice :: struct {}
|
|
|
|
@(private="package")
|
|
_Open :: struct {}
|
|
|
|
@(private="package")
|
|
_Stat :: struct {}
|
|
|
|
@(private="package")
|
|
_init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) {
|
|
l.allocator = alloc
|
|
|
|
l.completed.data.allocator = l.allocator
|
|
|
|
avl.init(&l.timeouts, timeouts_cmp, alloc)
|
|
|
|
mpsc_init(&l.completed_oob, QUEUE_SIZE, l.allocator)
|
|
defer if err != nil { mpsc_destroy(&l.completed_oob, l.allocator) }
|
|
|
|
dup_ok := win.DuplicateHandle(
|
|
win.GetCurrentProcess(), win.GetCurrentThread(),
|
|
win.GetCurrentProcess(), &l.thread,
|
|
0, false, win.DUPLICATE_SAME_ACCESS,
|
|
)
|
|
ensure(dup_ok == true)
|
|
defer if err != nil { win.CloseHandle(l.thread) }
|
|
|
|
err = g_ref()
|
|
return
|
|
}
|
|
|
|
@(private="package")
|
|
_destroy :: proc(l: ^Event_Loop) {
|
|
avl.destroy(&l.timeouts)
|
|
queue.destroy(&l.completed)
|
|
mpsc_destroy(&l.completed_oob, l.allocator)
|
|
win.CloseHandle(l.thread)
|
|
g_unref()
|
|
}
|
|
|
|
@(private="package")
|
|
__tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> (err: General_Error) {
|
|
debug("tick")
|
|
|
|
l.now = time.now()
|
|
next_timeout := check_timeouts(l)
|
|
|
|
// Prevent infinite loop when callback adds to completed by storing length.
|
|
n := queue.len(l.completed)
|
|
if n > 0 {
|
|
for _ in 0 ..< n {
|
|
op := queue.pop_front(&l.completed)
|
|
handle_completed(op)
|
|
}
|
|
}
|
|
|
|
for {
|
|
op := (^Operation)(mpsc_dequeue(&l.completed_oob))
|
|
if op == nil { break }
|
|
handle_completed(op)
|
|
}
|
|
|
|
if pool.num_outstanding(&l.operation_pool) == 0 { return nil }
|
|
|
|
actual_timeout := win.INFINITE
|
|
if queue.len(l.completed) > 0 || mpsc_count(&l.completed_oob) > 0 {
|
|
actual_timeout = 0
|
|
} else if timeout >= 0 {
|
|
actual_timeout = win.DWORD(timeout / time.Millisecond)
|
|
}
|
|
if nt, ok := next_timeout.?; ok {
|
|
actual_timeout = min(actual_timeout, win.DWORD(nt / time.Millisecond))
|
|
}
|
|
|
|
if actual_timeout > 0 {
|
|
sync.atomic_store_explicit(&l.state, .Sleeping, .Release)
|
|
|
|
// There could be a race condition where we go sleeping at the same time as things get queued
|
|
// and a wakeup isn't done because the state is not .Sleeping yet.
|
|
// So after sleeping we first check our queues.
|
|
|
|
for {
|
|
op := (^Operation)(mpsc_dequeue(&l.queue))
|
|
if op == nil { break }
|
|
_exec(op)
|
|
}
|
|
|
|
for {
|
|
op := (^Operation)(mpsc_dequeue(&l.completed_oob))
|
|
if op == nil { break }
|
|
handle_completed(op)
|
|
}
|
|
}
|
|
|
|
for {
|
|
events: [256]win.OVERLAPPED_ENTRY
|
|
entries_removed: win.ULONG
|
|
if !win.GetQueuedCompletionStatusEx(g.iocp, &events[0], len(events), &entries_removed, actual_timeout, true) {
|
|
winerr := win.GetLastError()
|
|
switch winerr {
|
|
case win.WAIT_TIMEOUT, win.WAIT_IO_COMPLETION:
|
|
entries_removed = 0
|
|
case:
|
|
err = General_Error(winerr)
|
|
return
|
|
}
|
|
}
|
|
|
|
sync.atomic_store_explicit(&l.state, .Working, .Relaxed)
|
|
|
|
if actual_timeout > 0 {
|
|
// We may have just waited some time, lets update the current time.
|
|
l.now = time.now()
|
|
}
|
|
|
|
if entries_removed > 0 {
|
|
debug(int(entries_removed), "operations were completed")
|
|
}
|
|
|
|
for event in events[:entries_removed] {
|
|
assert(event.lpOverlapped != nil)
|
|
op := container_of(container_of(event.lpOverlapped, _Operation, "over"), Operation, "_impl")
|
|
|
|
if op.l == l {
|
|
handle_completed(op)
|
|
} else {
|
|
op_l := op.l
|
|
for !mpsc_enqueue(&op.l.completed_oob, op) {
|
|
warn("oob queue filled up, QUEUE_SIZE may need increasing")
|
|
_wake_up(op_l)
|
|
win.SwitchToThread()
|
|
}
|
|
_wake_up(op_l)
|
|
}
|
|
}
|
|
|
|
if entries_removed < len(events) {
|
|
break
|
|
}
|
|
|
|
// `events` was filled up, get more.
|
|
debug("GetQueuedCompletionStatusEx filled entire events buffer, getting more")
|
|
actual_timeout = 0
|
|
}
|
|
|
|
return nil
|
|
|
|
check_timeouts :: proc(l: ^Event_Loop) -> (expires: Maybe(time.Duration)) {
|
|
curr := l.now
|
|
|
|
if avl.len(&l.timeouts) == 0 {
|
|
return
|
|
}
|
|
|
|
debug(avl.len(&l.timeouts), "timeouts", "threshold", curr)
|
|
|
|
iter := avl.iterator(&l.timeouts, .Forward)
|
|
for node in avl.iterator_next(&iter) {
|
|
op := node.value
|
|
cexpires := time.diff(curr, op.timeout._impl.expires)
|
|
|
|
debug("expires after", cexpires)
|
|
|
|
removed := op._impl.timeout == (^Operation)(REMOVED)
|
|
done := cexpires <= 0
|
|
if removed {
|
|
debug("timeout removed!")
|
|
} else if done {
|
|
debug("timeout done!")
|
|
handle_completed(op)
|
|
}
|
|
if removed || done {
|
|
avl.remove_node(&l.timeouts, node)
|
|
continue
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
// Don't merge this with the previous iteration because the `handle_completed` in that one might queue
|
|
// more timeouts which we want to detect here.
|
|
// For example: `timeout(time.Second, proc(_: ^Operation) { timeout(time.Second, ...) })`
|
|
|
|
first := avl.first(&l.timeouts)
|
|
if first != nil {
|
|
op := first.value
|
|
cexpires := time.diff(curr, op.timeout._impl.expires)
|
|
debug("first timeout in the future is at", op.timeout._impl.expires, "after", cexpires)
|
|
expires = cexpires
|
|
}
|
|
return
|
|
}
|
|
|
|
handle_completed :: proc(op: ^Operation) {
|
|
debug("handling", op.type)
|
|
|
|
if op._impl.timeout == (^Operation)(REMOVED) {
|
|
debug(op.type, "was removed")
|
|
|
|
// Set an error, and call the internal callback.
|
|
// This way resources are cleaned up properly, for example the result socket for dial.
|
|
// If we just do nothing it will be leaked.
|
|
|
|
if op._impl.over.Internal == nil {
|
|
// There is no error from the kernel, set one ourselves.
|
|
// This needs to be an NTSTATUS code, not a win32 error number.
|
|
STATUS_REQUEST_ABORTED :: 0xC023000C
|
|
op._impl.over.Internal = (^win.c_ulong)(uintptr(STATUS_REQUEST_ABORTED))
|
|
}
|
|
}
|
|
|
|
result := Op_Result.Done
|
|
switch op.type {
|
|
case .Read:
|
|
result = read_callback(op)
|
|
case .Recv:
|
|
result = recv_callback(op)
|
|
if result == .Done {
|
|
maybe_callback(op)
|
|
bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator)
|
|
cleanup(op)
|
|
return
|
|
}
|
|
case .Write:
|
|
result = write_callback(op)
|
|
case .Send:
|
|
result = send_callback(op)
|
|
if result == .Done {
|
|
maybe_callback(op)
|
|
bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator)
|
|
cleanup(op)
|
|
return
|
|
}
|
|
case .Send_File:
|
|
result = sendfile_callback(op)
|
|
case .Accept:
|
|
accept_callback(op)
|
|
case .Dial:
|
|
dial_callback(op)
|
|
case .Poll:
|
|
poll_callback(op)
|
|
case .Timeout, .Open, .Stat, .Close:
|
|
// no-op.
|
|
case .None, ._Link_Timeout, ._Remove, ._Splice:
|
|
fallthrough
|
|
case:
|
|
unreachable()
|
|
}
|
|
|
|
if result == .Pending {
|
|
assert(op._impl.timeout != (^Operation)(REMOVED))
|
|
debug(op.type, "pending")
|
|
return
|
|
}
|
|
|
|
maybe_callback(op)
|
|
cleanup(op)
|
|
|
|
maybe_callback :: proc(op: ^Operation) {
|
|
if op._impl.timeout == (^Operation)(REMOVED) {
|
|
debug(op.type, "done but removed")
|
|
} else {
|
|
debug(op.type, "done")
|
|
op.cb(op)
|
|
|
|
if op._impl.timeout != nil {
|
|
debug("cancelling timeout of", op.type)
|
|
op._impl.timeout.timeout._impl.target = nil
|
|
_remove(op._impl.timeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
cleanup :: proc(op: ^Operation) {
|
|
if !op.detached {
|
|
pool.put(&op.l.operation_pool, op)
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
@(private="package")
|
|
_exec :: proc(op: ^Operation) {
|
|
assert(op.l == &_tls_event_loop)
|
|
|
|
result: Op_Result
|
|
switch op.type {
|
|
case .Accept: result = accept_exec(op)
|
|
case .Close: close_exec(op); result = .Done
|
|
case .Dial: result = dial_exec(op)
|
|
case .Recv: result = recv_exec(op)
|
|
case .Send: result = send_exec(op)
|
|
case .Send_File: result = sendfile_exec(op)
|
|
case .Read: result = read_exec(op)
|
|
case .Write: result = write_exec(op)
|
|
case .Timeout: result = timeout_exec(op)
|
|
case .Poll: result = poll_exec(op)
|
|
case .Open: open_exec(op); result = .Done
|
|
case .Stat: stat_exec(op); result = .Done
|
|
case .None, ._Link_Timeout, ._Remove, ._Splice:
|
|
unreachable()
|
|
}
|
|
|
|
switch result {
|
|
case .Pending:
|
|
// no-op, in kernel.
|
|
debug("exec", op.type, "pending")
|
|
case .Done:
|
|
debug("exec", op.type, "done immediately")
|
|
_, err := queue.append(&op.l.completed, op) // Got result, handle it next tick.
|
|
ensure(err == nil, "allocation failure")
|
|
}
|
|
}
|
|
|
|
@(private="package")
|
|
_open_sync :: proc(l: ^Event_Loop, name: string, dir: Handle, mode: File_Flags, perm: Permissions) -> (handle: Handle, err: FS_Error) {
|
|
if len(name) == 0 {
|
|
err = .Invalid_Argument
|
|
return
|
|
}
|
|
|
|
dir := dir
|
|
|
|
is_abs := filepath.is_abs(name)
|
|
is_cwd: bool
|
|
cwd_path: win.wstring
|
|
if !is_abs && dir == CWD {
|
|
is_cwd = true
|
|
|
|
cwd_len := win.GetCurrentDirectoryW(0, nil)
|
|
assert(cwd_len > 0)
|
|
cwd_buf, cwd_err := make([]u16, cwd_len, l.allocator)
|
|
if cwd_err != nil { return INVALID_HANDLE, .Allocation_Failed }
|
|
cwd_len = win.GetCurrentDirectoryW(cwd_len, raw_data(cwd_buf))
|
|
assert(int(cwd_len) == len(cwd_buf)-1)
|
|
cwd_path = win.wstring(raw_data(cwd_buf))
|
|
|
|
dir = Handle(win.CreateFileW(
|
|
cwd_path,
|
|
win.GENERIC_READ,
|
|
win.FILE_SHARE_READ|win.FILE_SHARE_WRITE,
|
|
nil,
|
|
win.OPEN_EXISTING,
|
|
win.FILE_FLAG_BACKUP_SEMANTICS,
|
|
nil,
|
|
))
|
|
if dir == INVALID_HANDLE {
|
|
err = FS_Error(win.GetLastError())
|
|
return
|
|
}
|
|
}
|
|
defer if is_cwd {
|
|
delete(cwd_path, l.allocator)
|
|
win.CloseHandle(win.HANDLE(dir))
|
|
}
|
|
|
|
path, was_alloc := _normalize_path(name, l.allocator)
|
|
defer if was_alloc { delete(path, l.allocator) }
|
|
|
|
wpath := win.utf8_to_utf16(path, l.allocator)
|
|
defer delete(wpath, l.allocator)
|
|
|
|
if path == "" || wpath == nil {
|
|
return INVALID_HANDLE, .Allocation_Failed
|
|
}
|
|
|
|
path_len := len(wpath) * 2
|
|
if path_len > int(max(u16)) {
|
|
err = .Invalid_Argument
|
|
return
|
|
}
|
|
|
|
access: u32
|
|
switch mode & {.Read, .Write} {
|
|
case {.Read}: access = win.FILE_GENERIC_READ
|
|
case {.Write}: access = win.FILE_GENERIC_WRITE
|
|
case {.Read, .Write}: access = win.FILE_GENERIC_READ | win.FILE_GENERIC_WRITE
|
|
}
|
|
|
|
if .Create in mode {
|
|
access |= win.FILE_GENERIC_WRITE
|
|
}
|
|
if .Append in mode {
|
|
access &~= win.FILE_GENERIC_WRITE
|
|
access |= win.FILE_APPEND_DATA
|
|
}
|
|
share_mode := u32(win.FILE_SHARE_READ | win.FILE_SHARE_WRITE)
|
|
|
|
create_mode: u32 = win.FILE_OPEN
|
|
switch {
|
|
case mode & {.Create, .Excl} == {.Create, .Excl}:
|
|
create_mode = win.FILE_CREATE
|
|
case mode & {.Create, .Trunc} == {.Create, .Trunc}:
|
|
create_mode = win.FILE_OVERWRITE_IF
|
|
case mode & {.Create} == {.Create}:
|
|
create_mode = win.FILE_OPEN_IF
|
|
case mode & {.Trunc} == {.Trunc}:
|
|
create_mode = win.FILE_OVERWRITE
|
|
}
|
|
|
|
attrs: u32 = win.FILE_ATTRIBUTE_NORMAL
|
|
|
|
if .Write_User not_in perm {
|
|
attrs = win.FILE_ATTRIBUTE_READONLY
|
|
if create_mode == win.FILE_OVERWRITE_IF {
|
|
// NOTE(bill): Open has just asked to create a file in read-only mode.
|
|
// If the file already exists, to make it akin to a *nix open call,
|
|
// the call preserves the existing permissions.
|
|
|
|
h: win.HANDLE
|
|
io_status: win.IO_STATUS_BLOCK
|
|
status := win.NtCreateFile(
|
|
&h,
|
|
access,
|
|
&{
|
|
Length = size_of(win.OBJECT_ATTRIBUTES),
|
|
RootDirectory = is_abs ? nil : win.HANDLE(dir),
|
|
ObjectName = &{
|
|
Length = u16(path_len),
|
|
MaximumLength = u16(path_len),
|
|
Buffer = raw_data(wpath),
|
|
},
|
|
},
|
|
&io_status,
|
|
nil,
|
|
win.FILE_ATTRIBUTE_NORMAL,
|
|
share_mode,
|
|
win.FILE_OVERWRITE,
|
|
0,
|
|
nil,
|
|
0,
|
|
)
|
|
syserr := win.System_Error(win.RtlNtStatusToDosError(status))
|
|
#partial switch syserr {
|
|
case .FILE_NOT_FOUND, .BAD_NETPATH, .PATH_NOT_FOUND:
|
|
// File does not exists, create the file
|
|
case .SUCCESS:
|
|
association_err: Association_Error
|
|
handle, association_err = _associate_handle(uintptr(h), l)
|
|
// This shouldn't fail, we just created this file, with correct flags.
|
|
assert(association_err != nil)
|
|
return
|
|
case:
|
|
err = FS_Error(syserr)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
h: win.HANDLE
|
|
io_status: win.IO_STATUS_BLOCK
|
|
status := win.NtCreateFile(
|
|
&h,
|
|
access,
|
|
&{
|
|
Length = size_of(win.OBJECT_ATTRIBUTES),
|
|
RootDirectory = is_abs ? nil : win.HANDLE(dir),
|
|
ObjectName = &{
|
|
Length = u16(path_len),
|
|
MaximumLength = u16(path_len),
|
|
Buffer = raw_data(wpath),
|
|
},
|
|
},
|
|
&io_status,
|
|
nil,
|
|
attrs,
|
|
share_mode,
|
|
create_mode,
|
|
0,
|
|
nil,
|
|
0,
|
|
)
|
|
syserr := win.System_Error(win.RtlNtStatusToDosError(status))
|
|
#partial switch syserr {
|
|
case .SUCCESS:
|
|
association_err: Association_Error
|
|
handle, association_err = _associate_handle(uintptr(h), l)
|
|
// This shouldn't fail, we just created this file, with correct flags.
|
|
assert(association_err == nil)
|
|
return
|
|
case:
|
|
err = FS_Error(syserr)
|
|
return
|
|
}
|
|
|
|
@(require_results)
|
|
_normalize_path :: proc(path: string, allocator := context.allocator) -> (fixed: string, allocated: bool) {
|
|
// An UNC path or relative, just replace slashes.
|
|
if strings.has_prefix(path, `\\`) || !filepath.is_abs(path) {
|
|
return strings.replace_all(path, `/`, `\`)
|
|
}
|
|
|
|
path_buf, err := make([]byte, len(PREFIX)+len(path)+1, allocator)
|
|
if err != nil { return }
|
|
defer if !allocated { delete(path_buf, allocator) }
|
|
|
|
PREFIX :: `\??`
|
|
copy(path_buf, PREFIX)
|
|
n := len(path)
|
|
r, w := 0, len(PREFIX)
|
|
for r < n {
|
|
switch {
|
|
case filepath.is_separator(path[r]):
|
|
r += 1
|
|
case path[r] == '.' && (r+1 == n || filepath.is_separator(path[r+1])):
|
|
// \.\
|
|
r += 1
|
|
case r+1 < n && path[r] == '.' && path[r+1] == '.' && (r+2 == n || filepath.is_separator(path[r+2])):
|
|
// Skip \..\ paths
|
|
return path, false
|
|
case:
|
|
path_buf[w] = '\\'
|
|
w += 1
|
|
for r < n && !filepath.is_separator(path[r]) {
|
|
path_buf[w] = path[r]
|
|
r += 1
|
|
w += 1
|
|
}
|
|
}
|
|
}
|
|
|
|
// Root directories require a trailing \
|
|
if w == len(`\\?\c:`) {
|
|
path_buf[w] = '\\'
|
|
w += 1
|
|
}
|
|
|
|
allocated = true
|
|
fixed = string(path_buf[:w])
|
|
return
|
|
}
|
|
}
|
|
|
|
@(private="package")
|
|
_listen :: proc(socket: TCP_Socket, backlog := 1000) -> (err: Listen_Error) {
|
|
if res := win.listen(win.SOCKET(socket), i32(backlog)); res == win.SOCKET_ERROR {
|
|
err = net._listen_error()
|
|
}
|
|
return
|
|
}
|
|
|
|
@(private="package")
|
|
_create_socket :: proc(
|
|
l: ^Event_Loop,
|
|
family: Address_Family,
|
|
protocol: Socket_Protocol,
|
|
) -> (
|
|
socket: Any_Socket,
|
|
err: Create_Socket_Error,
|
|
) {
|
|
socket = net.create_socket(family, protocol) or_return
|
|
|
|
association_err := _associate_socket(socket, l)
|
|
// Network unreachable would've happened on creation too.
|
|
// Not possible to associate or invalid handle can't happen because we controlled creation.
|
|
assert(association_err == nil)
|
|
|
|
return
|
|
}
|
|
|
|
@(private="package")
|
|
_remove :: proc(target: ^Operation) {
|
|
debug("remove", target.type)
|
|
|
|
if target._impl.timeout == (^Operation)(REMOVED) {
|
|
return
|
|
}
|
|
|
|
if target._impl.timeout != nil {
|
|
_remove(target._impl.timeout)
|
|
}
|
|
|
|
target._impl.timeout = (^Operation)(REMOVED)
|
|
|
|
switch target.type {
|
|
case .Poll:
|
|
win.UnregisterWaitEx(target.poll._impl.wait_handle, nil)
|
|
target.poll._impl.wait_handle = nil
|
|
|
|
ok := win.PostQueuedCompletionStatus(
|
|
g.iocp,
|
|
0,
|
|
0,
|
|
&target._impl.over,
|
|
)
|
|
ensure(ok == true, "unexpected PostQueuedCompletionStatus error")
|
|
return
|
|
|
|
case .Timeout:
|
|
if avl.remove_value(&target.l.timeouts, target) {
|
|
debug("removed timeout directly")
|
|
if !target.detached {
|
|
pool.put(&target.l.operation_pool, target)
|
|
}
|
|
} else {
|
|
debug("timeout is in completed queue, will be picked up there")
|
|
}
|
|
return
|
|
|
|
case .Close, .Open, .Stat:
|
|
// Synchronous ops, picked up in handler.
|
|
return
|
|
|
|
case .Accept, .Dial, .Read, .Recv, .Send, .Write, .Send_File:
|
|
if is_pending(target._impl.over) {
|
|
handle := operation_handle(target)
|
|
assert(handle != win.INVALID_HANDLE)
|
|
ok := win.CancelIoEx(handle, &target._impl.over)
|
|
if !ok {
|
|
err := win.System_Error(win.GetLastError())
|
|
#partial switch err {
|
|
case .NOT_FOUND:
|
|
debug("Remove: Cancel", target.type, "NOT_FOUND")
|
|
case .INVALID_HANDLE:
|
|
debug("Remove: Cancel", target.type, "INVALID_HANDLE") // Likely closed already.
|
|
case:
|
|
assert(false, "unexpected CancelIoEx error")
|
|
}
|
|
}
|
|
}
|
|
|
|
case ._Remove:
|
|
panic("can't remove a removal")
|
|
|
|
case .None, ._Splice, ._Link_Timeout:
|
|
fallthrough
|
|
case:
|
|
unreachable()
|
|
}
|
|
}
|
|
|
|
@(private="package")
|
|
_associate_handle :: proc(handle: uintptr, l: ^Event_Loop) -> (Handle, Association_Error) {
|
|
handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), g.iocp, 0, 0)
|
|
if handle_iocp != g.iocp {
|
|
return INVALID_HANDLE, .Not_Possible_To_Associate
|
|
}
|
|
|
|
cmode: byte
|
|
cmode |= win.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
|
|
cmode |= win.FILE_SKIP_SET_EVENT_ON_HANDLE
|
|
ok := win.SetFileCompletionNotificationModes(win.HANDLE(handle), cmode)
|
|
|
|
// This is an assertion because I don't believe this can happen when we just successfully
|
|
// called `CreateIoCompletionPort`.
|
|
assert(ok == true, "unexpected SetFileCompletionNotificationModes error")
|
|
|
|
return Handle(handle), nil
|
|
}
|
|
|
|
@(private="package")
|
|
_associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Error {
|
|
if err := net.set_blocking(socket, false); err != nil {
|
|
switch err {
|
|
case .None: unreachable()
|
|
case .Network_Unreachable: return .Network_Unreachable
|
|
case .Invalid_Argument: return .Invalid_Handle
|
|
case .Unknown: fallthrough
|
|
case: return Association_Error(net.last_platform_error())
|
|
}
|
|
}
|
|
|
|
_, err := _associate_handle(uintptr(net.any_socket_to_socket(socket)), l)
|
|
return err
|
|
}
|
|
|
|
@(private="package")
|
|
_wake_up :: proc(l: ^Event_Loop) {
|
|
_, exchanged := sync.atomic_compare_exchange_strong(&l.state, .Sleeping, .Waking)
|
|
if exchanged {
|
|
win.QueueUserAPC(proc "system" (Parameter: win.ULONG_PTR) {}, l.thread, 0)
|
|
}
|
|
}
|
|
|
|
@(private="package")
|
|
_yield :: proc() {
|
|
win.SwitchToThread()
|
|
}
|
|
|
|
// Start file private.
|
|
|
|
QUEUE_SIZE :: 128
|
|
|
|
REMOVED :: rawptr(max(uintptr)-1)
|
|
|
|
INVALID_HANDLE :: Handle(win.INVALID_HANDLE)
|
|
|
|
Op_Result :: enum {
|
|
Done,
|
|
Pending,
|
|
}
|
|
|
|
/*
|
|
IOCP is designed to be used from multiple threads.
|
|
For best performance we need to adhere to that and have one single IOCP for the event loops to share.
|
|
*/
|
|
g: struct{
|
|
mu: sync.Mutex,
|
|
refs: int,
|
|
iocp: win.HANDLE,
|
|
err: General_Error,
|
|
}
|
|
|
|
g_ref :: proc() -> General_Error {
|
|
sync.guard(&g.mu)
|
|
|
|
if g.refs == 0 {
|
|
win.ensure_winsock_initialized()
|
|
|
|
// NOTE: setting NumberOfConcurrentThreads to 0 which makes Windows use the amount of processors as a default.
|
|
// We may want to make this configurable somehow?
|
|
g.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 0)
|
|
if g.iocp == nil {
|
|
g.err = General_Error(win.GetLastError())
|
|
}
|
|
}
|
|
|
|
sync.atomic_add(&g.refs, 1)
|
|
|
|
return sync.atomic_load(&g.err)
|
|
}
|
|
|
|
g_unref :: proc() {
|
|
sync.guard(&g.mu)
|
|
|
|
if sync.atomic_sub(&g.refs, 1) == 1 {
|
|
win.CloseHandle(g.iocp)
|
|
g.err = nil
|
|
}
|
|
}
|
|
|
|
operation_handle :: proc(op: ^Operation) -> win.HANDLE {
|
|
switch op.type {
|
|
case .Accept: return win.HANDLE(uintptr(op.accept.socket))
|
|
case .Close:
|
|
switch fd in op.close.subject {
|
|
case TCP_Socket: return win.HANDLE(uintptr(fd))
|
|
case UDP_Socket: return win.HANDLE(uintptr(fd))
|
|
case Handle: return win.HANDLE(uintptr(fd))
|
|
case:
|
|
unreachable()
|
|
}
|
|
case .Dial: return win.HANDLE(uintptr(op.dial.socket))
|
|
case .Read: return win.HANDLE(op.read.handle)
|
|
case .Write: return win.HANDLE(op.write.handle)
|
|
case .Recv: return win.HANDLE(uintptr(net.any_socket_to_socket(op.recv.socket)))
|
|
case .Send: return win.HANDLE(uintptr(net.any_socket_to_socket(op.send.socket)))
|
|
case .Send_File: return win.HANDLE(uintptr(net.any_socket_to_socket(op.sendfile.socket)))
|
|
case .Poll: return win.HANDLE(uintptr(net.any_socket_to_socket(op.poll.socket)))
|
|
case .Stat: return win.HANDLE(uintptr(op.stat.handle))
|
|
|
|
case .Timeout, .Open, ._Splice, ._Link_Timeout, ._Remove, .None:
|
|
return win.INVALID_HANDLE
|
|
case:
|
|
unreachable()
|
|
}
|
|
}
|
|
|
|
close_exec :: proc(op: ^Operation) {
|
|
assert(op.type == .Close)
|
|
|
|
switch h in op.close.subject {
|
|
case Handle:
|
|
if !win.CloseHandle(win.HANDLE(h)) {
|
|
op.close.err = FS_Error(win.GetLastError())
|
|
}
|
|
case TCP_Socket:
|
|
if win.closesocket(win.SOCKET(h)) != win.NO_ERROR {
|
|
op.close.err = FS_Error(win.WSAGetLastError())
|
|
}
|
|
case UDP_Socket:
|
|
if win.closesocket(win.SOCKET(h)) != win.NO_ERROR {
|
|
op.close.err = FS_Error(win.WSAGetLastError())
|
|
}
|
|
case:
|
|
op.close.err = .Invalid_Argument
|
|
return
|
|
}
|
|
}
|
|
|
|
@(require_results)
|
|
accept_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Accept)
|
|
assert(is_fresh(op._impl.over))
|
|
|
|
family := Address_Family.IP4
|
|
{
|
|
ep, err := bound_endpoint(op.accept.socket)
|
|
if err != nil {
|
|
op.accept.err = net._accept_error()
|
|
return .Done
|
|
}
|
|
|
|
if _, is_ip6 := ep.address.(IP6_Address); is_ip6 {
|
|
family = .IP6
|
|
}
|
|
}
|
|
|
|
client, err := _create_socket(op.l, family, .TCP)
|
|
if err != nil {
|
|
op.accept.err = net._accept_error()
|
|
return .Done
|
|
}
|
|
|
|
|
|
op.accept.client = client.(TCP_Socket)
|
|
|
|
received: win.DWORD
|
|
if !win.AcceptEx(
|
|
win.SOCKET(op.accept.socket),
|
|
win.SOCKET(op.accept.client),
|
|
&op.accept._impl.addrs,
|
|
0,
|
|
size_of(win.sockaddr_in6) + 16,
|
|
size_of(win.sockaddr_in6) + 16,
|
|
&received,
|
|
&op._impl.over,
|
|
) {
|
|
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
|
link_timeout(op, op.accept.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
op.accept.err = net._accept_error()
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
accept_callback :: proc(op: ^Operation) {
|
|
assert(op.type == .Accept)
|
|
|
|
defer if op.accept.err != nil {
|
|
win.closesocket(win.SOCKET(op.accept.client))
|
|
}
|
|
|
|
if op.accept.err != nil {
|
|
return
|
|
}
|
|
|
|
_, err := wsa_get_result(win.SOCKET(op.accept.socket), op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
local_addr: ^win.sockaddr
|
|
local_addr_len: win.INT
|
|
remote_addr: ^win.sockaddr
|
|
remote_addr_len: win.INT
|
|
win.GetAcceptExSockaddrs(
|
|
&op.accept._impl.addrs,
|
|
0,
|
|
size_of(win.sockaddr_in6) + 16,
|
|
size_of(win.sockaddr_in6) + 16,
|
|
&local_addr,
|
|
&local_addr_len,
|
|
&remote_addr,
|
|
&remote_addr_len,
|
|
)
|
|
|
|
assert(remote_addr_len <= size_of(win.SOCKADDR_STORAGE_LH))
|
|
op.accept.client_endpoint = sockaddr_to_endpoint((^win.SOCKADDR_STORAGE_LH)(remote_addr))
|
|
|
|
// enables getsockopt, setsockopt, getsockname, getpeername, etc.
|
|
win.setsockopt(win.SOCKET(op.accept.client), win.SOL_SOCKET, win.SO_UPDATE_ACCEPT_CONTEXT, nil, 0)
|
|
|
|
case .OPERATION_ABORTED:
|
|
// This error could also happen when the user calls close on the socket.
|
|
if check_timed_out(op, op.accept.expires) {
|
|
op.accept.err = Accept_Error.Timeout
|
|
return
|
|
}
|
|
fallthrough
|
|
|
|
case:
|
|
op.accept.err = net._accept_error()
|
|
}
|
|
}
|
|
|
|
@(require_results)
|
|
dial_exec :: proc(op: ^Operation) -> (result: Op_Result) {
|
|
assert(op.type == .Dial)
|
|
assert(is_fresh(op._impl.over))
|
|
|
|
if op.dial.endpoint.port == 0 {
|
|
op.dial.err = .Port_Required
|
|
return .Done
|
|
}
|
|
|
|
family := family_from_endpoint(op.dial.endpoint)
|
|
osocket, socket_err := _create_socket(op.l, family, .TCP)
|
|
if socket_err != nil {
|
|
op.dial.err = socket_err
|
|
return .Done
|
|
}
|
|
|
|
op.dial.socket = osocket.(TCP_Socket)
|
|
|
|
sockaddr := endpoint_to_sockaddr({IP6_Any if family == .IP6 else net.IP4_Any, 0})
|
|
res := win.bind(win.SOCKET(op.dial.socket), &sockaddr, size_of(sockaddr))
|
|
if res < 0 {
|
|
op.dial.err = net._bind_error()
|
|
win.closesocket(win.SOCKET(op.dial.socket))
|
|
return .Done
|
|
}
|
|
|
|
op.dial._impl.addr = endpoint_to_sockaddr(op.dial.endpoint)
|
|
|
|
connect_ex: win.LPFN_CONNECTEX
|
|
load_socket_fn(win.SOCKET(op.dial.socket), win.WSAID_CONNECTEX, &connect_ex)
|
|
|
|
transferred: win.DWORD
|
|
if !connect_ex(
|
|
win.SOCKET(op.dial.socket),
|
|
&op.dial._impl.addr,
|
|
size_of(op.dial._impl.addr),
|
|
nil,
|
|
0,
|
|
&transferred,
|
|
&op._impl.over,
|
|
) {
|
|
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
|
link_timeout(op, op.dial.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
op.dial.err = net._dial_error()
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
dial_callback :: proc(op: ^Operation) {
|
|
assert(op.type == .Dial)
|
|
|
|
defer if op.dial.err != nil {
|
|
win.closesocket(win.SOCKET(op.dial.socket))
|
|
}
|
|
|
|
if op.dial.err != nil {
|
|
return
|
|
}
|
|
|
|
_, err := wsa_get_result(win.SOCKET(op.dial.socket), op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
// enables getsockopt, setsockopt, getsockname, getpeername, etc.
|
|
win.setsockopt(win.SOCKET(op.dial.socket), win.SOL_SOCKET, win.SO_UPDATE_CONNECT_CONTEXT, nil, 0)
|
|
|
|
case .OPERATION_ABORTED:
|
|
op.dial.err = Dial_Error.Timeout
|
|
|
|
case:
|
|
op.dial.err = net._dial_error()
|
|
}
|
|
}
|
|
|
|
@(require_results)
|
|
read_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Read)
|
|
op._impl.over = {} // Can be called multiple times.
|
|
|
|
op._impl.over.OffsetFull = u64(op.read.offset) + u64(op.read.read)
|
|
|
|
to_read := op.read.buf[op.read.read:]
|
|
|
|
read: win.DWORD
|
|
if !win.ReadFile(
|
|
win.HANDLE(op.read.handle),
|
|
raw_data(to_read),
|
|
win.DWORD(min(len(to_read), MAX_RW)),
|
|
&read,
|
|
&op._impl.over,
|
|
) {
|
|
assert(read == 0)
|
|
if is_pending(op._impl.over) {
|
|
link_timeout(op, op.read.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
err := win.GetLastError()
|
|
if is_incomplete(win.System_Error(err)) {
|
|
link_timeout(op, op.read.expires)
|
|
return .Pending
|
|
}
|
|
op.read.err = FS_Error(err)
|
|
}
|
|
}
|
|
|
|
assert(uintptr(read) == uintptr(op._impl.over.InternalHigh))
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
read_callback :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Read)
|
|
|
|
if op.read.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
n, err := get_result(op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
case .OPERATION_ABORTED:
|
|
// This error could also happen when the user calls close on the handle.
|
|
if check_timed_out(op, op.read.expires) {
|
|
op.read.err = .Timeout
|
|
return .Done
|
|
}
|
|
fallthrough
|
|
case .HANDLE_EOF:
|
|
if op.read.read == 0 {
|
|
op.read.err = .EOF
|
|
return .Done
|
|
}
|
|
case:
|
|
op.read.err = FS_Error(err)
|
|
return .Done
|
|
}
|
|
|
|
op.read.read += n
|
|
if op.read.all && op.read.read < len(op.read.buf) {
|
|
switch read_exec(op) {
|
|
case .Done: return read_callback(op)
|
|
case .Pending: return .Pending
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
write_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Write)
|
|
op._impl.over = {} // Can be called multiple times.
|
|
|
|
op._impl.over.OffsetFull = u64(op.write.offset) + u64(op.write.written)
|
|
|
|
to_write := op.write.buf[op.write.written:]
|
|
|
|
written: win.DWORD
|
|
if !win.WriteFile(
|
|
win.HANDLE(op.write.handle),
|
|
raw_data(to_write),
|
|
win.DWORD(min(len(to_write), MAX_RW)),
|
|
&written,
|
|
&op._impl.over,
|
|
) {
|
|
assert(written == 0)
|
|
if is_pending(op._impl.over) {
|
|
link_timeout(op, op.write.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
err := win.GetLastError()
|
|
if is_incomplete(win.System_Error(err)) {
|
|
link_timeout(op, op.write.expires)
|
|
return .Pending
|
|
}
|
|
op.write.err = FS_Error(err)
|
|
}
|
|
}
|
|
|
|
assert(uintptr(written) == uintptr(op._impl.over.InternalHigh))
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
write_callback :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Write)
|
|
|
|
if op.write.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
n, err := get_result(op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
case .OPERATION_ABORTED:
|
|
// This error could also happen when the user calls close on the handle.
|
|
if check_timed_out(op, op.write.expires) {
|
|
op.write.err = .Timeout
|
|
return .Done
|
|
}
|
|
fallthrough
|
|
case:
|
|
op.write.err = FS_Error(err)
|
|
return .Done
|
|
}
|
|
|
|
op.write.written += n
|
|
if op.write.all && op.write.written < len(op.write.buf) {
|
|
switch write_exec(op) {
|
|
case .Done: return write_callback(op)
|
|
case .Pending: return .Pending
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
recv_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Recv)
|
|
op._impl.over = {} // Can be called multiple times.
|
|
|
|
if op.recv.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
bufs, _ := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received)
|
|
win_bufs := ([^]win.WSABUF)(intrinsics.alloca(size_of(win.WSABUF) * len(bufs), align_of(win.WSABUF)))
|
|
for buf, i in bufs {
|
|
assert(i64(len(buf)) < i64(max(u32)))
|
|
win_bufs[i] = {len=u32(len(buf)), buf=raw_data(buf)}
|
|
}
|
|
|
|
status: win.c_int
|
|
switch sock in op.recv.socket {
|
|
case TCP_Socket:
|
|
status = win.WSARecv(
|
|
win.SOCKET(sock),
|
|
win_bufs,
|
|
u32(len(bufs)),
|
|
nil,
|
|
&op.recv._impl.flags,
|
|
win.LPWSAOVERLAPPED(&op._impl.over),
|
|
nil,
|
|
)
|
|
case UDP_Socket:
|
|
op.recv._impl.source_len = size_of(op.recv._impl.source)
|
|
status = win.WSARecvFrom(
|
|
win.SOCKET(sock),
|
|
win_bufs,
|
|
u32(len(bufs)),
|
|
nil,
|
|
&op.recv._impl.flags,
|
|
(^win.sockaddr)(&op.recv._impl.source),
|
|
&op.recv._impl.source_len,
|
|
win.LPWSAOVERLAPPED(&op._impl.over),
|
|
nil,
|
|
)
|
|
}
|
|
|
|
if status == win.SOCKET_ERROR {
|
|
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
|
link_timeout(op, op.recv.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
switch _ in op.recv.socket {
|
|
case TCP_Socket: op.recv.err = net._tcp_recv_error()
|
|
case UDP_Socket: op.recv.err = net._udp_recv_error()
|
|
}
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
recv_callback :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Recv)
|
|
|
|
if op.recv.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
n, err := wsa_get_result(win.SOCKET((^net.Socket)(&op.recv.socket)^), op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
case .OPERATION_ABORTED:
|
|
// This error could also happen when the user calls close on the socket.
|
|
if check_timed_out(op, op.recv.expires) {
|
|
switch _ in op.recv.socket {
|
|
case TCP_Socket: op.recv.err = net.TCP_Recv_Error.Timeout
|
|
case UDP_Socket: op.recv.err = net.UDP_Recv_Error.Timeout
|
|
}
|
|
return .Done
|
|
}
|
|
fallthrough
|
|
case:
|
|
switch _ in op.recv.socket {
|
|
case TCP_Socket: op.recv.err = net._tcp_recv_error()
|
|
case UDP_Socket: op.recv.err = net._udp_recv_error()
|
|
}
|
|
return .Done
|
|
}
|
|
|
|
op.recv.received += n
|
|
|
|
switch sock in op.recv.socket {
|
|
case TCP_Socket:
|
|
if n == 0 {
|
|
// Connection closed.
|
|
return .Done
|
|
}
|
|
|
|
if op.recv.all {
|
|
total: int
|
|
for buf in op.recv.bufs {
|
|
total += len(buf)
|
|
}
|
|
|
|
if op.recv.received < total {
|
|
switch recv_exec(op) {
|
|
case .Done: return recv_callback(op)
|
|
case .Pending: return .Pending
|
|
}
|
|
}
|
|
}
|
|
|
|
case UDP_Socket:
|
|
assert(op.recv._impl.source_len > 0)
|
|
op.recv.source = sockaddr_to_endpoint(&op.recv._impl.source)
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
send_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Send)
|
|
op._impl.over = {} // Can be called multiple times.
|
|
|
|
if op.send.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
bufs, _ := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent)
|
|
win_bufs := ([^]win.WSABUF)(intrinsics.alloca(size_of(win.WSABUF) * len(bufs), align_of(win.WSABUF)))
|
|
for buf, i in bufs {
|
|
assert(i64(len(buf)) < i64(max(u32)))
|
|
win_bufs[i] = {len=u32(len(buf)), buf=raw_data(buf)}
|
|
}
|
|
|
|
status: win.c_int
|
|
switch sock in op.send.socket {
|
|
case TCP_Socket:
|
|
status = win.WSASend(
|
|
win.SOCKET(sock),
|
|
win_bufs,
|
|
u32(len(bufs)),
|
|
nil,
|
|
0,
|
|
win.LPWSAOVERLAPPED(&op._impl.over),
|
|
nil,
|
|
)
|
|
case UDP_Socket:
|
|
addr := endpoint_to_sockaddr(op.send.endpoint)
|
|
status = win.WSASendTo(
|
|
win.SOCKET(sock),
|
|
win_bufs,
|
|
u32(len(bufs)),
|
|
nil,
|
|
0,
|
|
(^win.sockaddr)(&addr),
|
|
size_of(addr),
|
|
win.LPWSAOVERLAPPED(&op._impl.over),
|
|
nil,
|
|
)
|
|
}
|
|
|
|
if status == win.SOCKET_ERROR {
|
|
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
|
link_timeout(op, op.send.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
switch _ in op.send.socket {
|
|
case TCP_Socket: op.send.err = net._tcp_send_error()
|
|
case UDP_Socket: op.send.err = net._udp_send_error()
|
|
}
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
send_callback :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Send)
|
|
|
|
if op.send.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
n, err := wsa_get_result(win.SOCKET((^net.Socket)(&op.send.socket)^), op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
case .OPERATION_ABORTED:
|
|
// This error could also happen when the user calls close on the socket.
|
|
if check_timed_out(op, op.send.expires) {
|
|
switch _ in op.send.socket {
|
|
case TCP_Socket: op.send.err = net.TCP_Send_Error.Timeout
|
|
case UDP_Socket: op.send.err = net.UDP_Send_Error.Timeout
|
|
}
|
|
return .Done
|
|
}
|
|
fallthrough
|
|
case:
|
|
switch _ in op.send.socket {
|
|
case TCP_Socket: op.send.err = net._tcp_send_error()
|
|
case UDP_Socket: op.send.err = net._udp_send_error()
|
|
}
|
|
return .Done
|
|
}
|
|
|
|
op.send.sent += n
|
|
|
|
if op.send.all {
|
|
total: int
|
|
for buf in op.send.bufs {
|
|
total += len(buf)
|
|
}
|
|
|
|
if op.send.sent < total {
|
|
switch send_exec(op) {
|
|
case .Done: return send_callback(op)
|
|
case .Pending: return .Pending
|
|
}
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
sendfile_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Send_File)
|
|
op._impl.over = {} // Can be called multiple times.
|
|
|
|
if op.sendfile.nbytes == SEND_ENTIRE_FILE {
|
|
type, size, stat_err := stat(op.sendfile.file)
|
|
if stat_err != nil {
|
|
op.sendfile.err = stat_err
|
|
return .Done
|
|
}
|
|
|
|
op.sendfile.nbytes = int(size - i64(op.sendfile.offset))
|
|
if type != .Regular || op.sendfile.nbytes <= 0 {
|
|
op.sendfile.err = FS_Error.Invalid_Argument
|
|
return .Done
|
|
}
|
|
}
|
|
|
|
op._impl.over.OffsetFull = u64(op.sendfile.offset) + u64(op.sendfile.sent)
|
|
|
|
if !win.TransmitFile(
|
|
win.SOCKET(op.sendfile.socket),
|
|
win.HANDLE(op.sendfile.file),
|
|
u32(min(op.sendfile.nbytes - op.sendfile.sent, MAX_RW)),
|
|
0,
|
|
&op._impl.over,
|
|
nil,
|
|
0,
|
|
) {
|
|
if is_pending(op._impl.over) || (op._impl.over.Internal == nil && is_incomplete(win.System_Error(win.GetLastError()))) {
|
|
link_timeout(op, op.sendfile.expires)
|
|
return .Pending
|
|
} else if op._impl.over.Internal == nil {
|
|
op.sendfile.err = net._tcp_send_error()
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
sendfile_callback :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Send_File)
|
|
|
|
if op.sendfile.err != nil {
|
|
return .Done
|
|
}
|
|
|
|
n, err := wsa_get_result(win.SOCKET(op.sendfile.socket), op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
case .OPERATION_ABORTED:
|
|
// This error could also happen when the user calls close on the socket.
|
|
if check_timed_out(op, op.sendfile.expires) {
|
|
op.sendfile.err = TCP_Send_Error.Timeout
|
|
return .Done
|
|
}
|
|
fallthrough
|
|
case:
|
|
op.sendfile.err = net._tcp_send_error()
|
|
return .Done
|
|
}
|
|
|
|
op.sendfile.sent += n
|
|
if op.sendfile.sent < op.sendfile.nbytes {
|
|
switch sendfile_exec(op) {
|
|
case .Done:
|
|
return sendfile_callback(op)
|
|
case .Pending:
|
|
if op.sendfile.progress_updates { op.cb(op) }
|
|
return .Pending
|
|
}
|
|
}
|
|
|
|
return .Done
|
|
}
|
|
|
|
@(require_results)
|
|
poll_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Poll)
|
|
|
|
events: i32 = win.FD_CLOSE
|
|
switch op.poll.event {
|
|
case .Send: events |= win.FD_WRITE|win.FD_CONNECT
|
|
case .Receive: events |= win.FD_READ|win.FD_ACCEPT
|
|
case:
|
|
op.poll.result = .Invalid_Argument
|
|
return .Done
|
|
}
|
|
|
|
op._impl.over.hEvent = win.WSACreateEvent()
|
|
if win.WSAEventSelect(
|
|
win.SOCKET(net.any_socket_to_socket(op.poll.socket)),
|
|
op._impl.over.hEvent,
|
|
events,
|
|
) != 0 {
|
|
#partial switch win.System_Error(win.GetLastError()) {
|
|
case .WSAEINVAL, .WSAENOTSOCK: op.poll.result = .Invalid_Argument
|
|
case: op.poll.result = .Error
|
|
}
|
|
return .Done
|
|
}
|
|
|
|
timeout := win.INFINITE
|
|
if op.poll.expires != {} {
|
|
diff := max(0, time.diff(op.l.now, op.poll.expires))
|
|
timeout = win.DWORD(diff / time.Millisecond)
|
|
}
|
|
|
|
ok := win.RegisterWaitForSingleObject(
|
|
&op.poll._impl.wait_handle,
|
|
op._impl.over.hEvent,
|
|
wait_callback,
|
|
op,
|
|
timeout,
|
|
win.WT_EXECUTEINWAITTHREAD|win.WT_EXECUTEONLYONCE,
|
|
)
|
|
ensure(ok == true, "unexpected RegisterWaitForSingleObject error")
|
|
|
|
return .Pending
|
|
|
|
wait_callback :: proc "system" (lpParameter: win.PVOID, TimerOrWaitFired: win.BOOLEAN) {
|
|
op := (^Operation)(lpParameter)
|
|
assert_contextless(op.type == .Poll)
|
|
|
|
if TimerOrWaitFired {
|
|
op.poll.result = .Timeout
|
|
}
|
|
|
|
ok := win.PostQueuedCompletionStatus(
|
|
g.iocp,
|
|
0,
|
|
0,
|
|
&op._impl.over,
|
|
)
|
|
ensure_contextless(ok == true, "unexpected PostQueuedCompletionStatus error")
|
|
}
|
|
}
|
|
|
|
poll_callback :: proc(op: ^Operation) {
|
|
assert(op.type == .Poll)
|
|
|
|
if op._impl.over.hEvent != nil {
|
|
win.WSACloseEvent(op._impl.over.hEvent)
|
|
}
|
|
|
|
if op.poll._impl.wait_handle != nil {
|
|
win.UnregisterWaitEx(op.poll._impl.wait_handle, nil)
|
|
}
|
|
|
|
if op.poll.result != nil {
|
|
return
|
|
}
|
|
|
|
_, err := get_result(op._impl.over)
|
|
#partial switch err {
|
|
case .SUCCESS:
|
|
case:
|
|
op.poll.result = .Error
|
|
}
|
|
}
|
|
|
|
open_exec :: proc(op: ^Operation) {
|
|
assert(op.type == .Open)
|
|
// No async way of doing this.
|
|
op.open.handle, op.open.err = _open_sync(op.l, op.open.path, op.open.dir, op.open.mode, op.open.perm)
|
|
}
|
|
|
|
stat_exec :: proc(op: ^Operation) {
|
|
assert(op.type == .Stat)
|
|
// No async way of doing this.
|
|
op.stat.type, op.stat.size, op.stat.err = stat(op.stat.handle)
|
|
}
|
|
|
|
@(require_results)
|
|
timeout_exec :: proc(op: ^Operation) -> Op_Result {
|
|
assert(op.type == .Timeout)
|
|
|
|
if op.timeout.duration <= 0 {
|
|
return .Done
|
|
} else {
|
|
op.timeout._impl.expires = time.time_add(now(), op.timeout.duration)
|
|
node, inserted, alloc_err := avl.find_or_insert(&op.l.timeouts, op)
|
|
assert(alloc_err == nil)
|
|
assert(inserted)
|
|
assert(node != nil)
|
|
return .Pending
|
|
}
|
|
}
|
|
|
|
link_timeout :: proc(op: ^Operation, expires: time.Time) {
|
|
if expires == {} {
|
|
return
|
|
}
|
|
|
|
timeout_op := _prep(op.l, internal_timeout_callback, .Timeout)
|
|
timeout_op.timeout._impl.expires = expires
|
|
timeout_op.timeout._impl.target = op
|
|
op._impl.timeout = timeout_op
|
|
|
|
node, inserted, alloc_err := avl.find_or_insert(&op.l.timeouts, timeout_op)
|
|
assert(alloc_err == nil)
|
|
assert(inserted)
|
|
assert(node != nil)
|
|
}
|
|
|
|
internal_timeout_callback :: proc(op: ^Operation) {
|
|
assert(op.type == .Timeout)
|
|
|
|
target := op.timeout._impl.target
|
|
assert(target != nil)
|
|
assert(target._impl.timeout == op)
|
|
target._impl.timeout = nil
|
|
|
|
#partial switch target.type {
|
|
case .Poll:
|
|
target.poll.result = .Timeout
|
|
target.cb(target)
|
|
_remove(target)
|
|
return
|
|
}
|
|
|
|
if is_pending(target._impl.over) {
|
|
handle := operation_handle(target)
|
|
assert(handle != win.INVALID_HANDLE)
|
|
ok := win.CancelIoEx(handle, &target._impl.over)
|
|
if !ok {
|
|
err := win.System_Error(win.GetLastError())
|
|
#partial switch err {
|
|
case .NOT_FOUND:
|
|
debug("Timeout: Cancel", target.type, "NOT_FOUND")
|
|
case .INVALID_HANDLE:
|
|
debug("Timeout: Cancel", target.type, "INVALID_HANDLE")
|
|
case:
|
|
assert(false, "unexpected CancelIoEx error")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
stat :: proc(handle: Handle) -> (type: File_Type, size: i64, err: FS_Error) {
|
|
info: win.FILE_STANDARD_INFO
|
|
if !win.GetFileInformationByHandleEx(win.HANDLE(handle), .FileStandardInfo, &info, size_of(info)) {
|
|
err = FS_Error(win.GetLastError())
|
|
return
|
|
}
|
|
|
|
size = i64(info.EndOfFile)
|
|
|
|
if info.Directory {
|
|
type = .Directory
|
|
return
|
|
}
|
|
|
|
switch win.GetFileType(win.HANDLE(handle)) {
|
|
case win.FILE_TYPE_PIPE:
|
|
type = .Pipe_Or_Socket
|
|
return
|
|
case win.FILE_TYPE_CHAR:
|
|
type = .Device
|
|
return
|
|
case win.FILE_TYPE_DISK:
|
|
type = .Regular
|
|
// Don't return, might be a symlink.
|
|
case:
|
|
type = .Undetermined
|
|
return
|
|
}
|
|
|
|
|
|
tag_info: win.FILE_ATTRIBUTE_TAG_INFO
|
|
if !win.GetFileInformationByHandleEx(win.HANDLE(handle), .FileAttributeTagInfo, &tag_info, size_of(tag_info)) {
|
|
return
|
|
}
|
|
|
|
if (
|
|
(tag_info.FileAttributes & win.FILE_ATTRIBUTE_REPARSE_POINT != 0) &&
|
|
(
|
|
(tag_info.ReparseTag == win.IO_REPARSE_TAG_SYMLINK) ||
|
|
(tag_info.ReparseTag == win.IO_REPARSE_TAG_MOUNT_POINT)
|
|
)
|
|
) {
|
|
type = .Symlink
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
STATUS_PENDING :: rawptr(uintptr(0x103))
|
|
|
|
is_pending :: proc(over: win.OVERLAPPED) -> bool {
|
|
return over.Internal == STATUS_PENDING
|
|
}
|
|
|
|
is_fresh :: proc(over: win.OVERLAPPED) -> bool {
|
|
return over.Internal == nil && over.InternalHigh == nil
|
|
}
|
|
|
|
get_result :: proc(over: win.OVERLAPPED) -> (n: int, err: win.System_Error) {
|
|
assert(!is_pending(over))
|
|
|
|
n = int(uintptr(over.InternalHigh))
|
|
|
|
if over.Internal != nil {
|
|
err = win.System_Error(win.RtlNtStatusToDosError(win.NTSTATUS(uintptr(over.Internal))))
|
|
assert(!is_incomplete(err))
|
|
}
|
|
return
|
|
}
|
|
|
|
// `get_result` above translates NT status codes to errors through RtlNtStatsToDosError,
|
|
// this is context free and can cause weird mappings, thus for sockets we want to call `WSAGetOverlappedResult`
|
|
// which does context based mapping of error codes.
|
|
// See https://stackoverflow.com/questions/28925003/calling-wsagetlasterror-from-an-iocp-thread-return-incorrect-result
|
|
wsa_get_result :: proc(socket: win.SOCKET, over: win.OVERLAPPED) -> (n: int, err: win.System_Error) {
|
|
over := over
|
|
assert(!is_pending(over))
|
|
|
|
if over.Internal != nil {
|
|
flags: win.DWORD
|
|
_n: win.DWORD
|
|
res := win.WSAGetOverlappedResult(socket, &over, &_n, false, &flags)
|
|
assert(!res)
|
|
n = int(_n) // NOTE: It is possible that an amount of bytes is present when the operation was cancelled.
|
|
err = win.System_Error(win.WSAGetLastError())
|
|
}
|
|
|
|
n = int(uintptr(over.InternalHigh))
|
|
return
|
|
}
|
|
|
|
is_incomplete :: proc(err: win.System_Error) -> bool {
|
|
#partial switch err {
|
|
case .WSAEWOULDBLOCK, .IO_PENDING, .IO_INCOMPLETE, .WSAEALREADY: return true
|
|
case: return false
|
|
}
|
|
}
|
|
|
|
endpoint_to_sockaddr :: proc(ep: Endpoint) -> (sockaddr: win.SOCKADDR_STORAGE_LH) {
|
|
switch a in ep.address {
|
|
case IP4_Address:
|
|
(^win.sockaddr_in)(&sockaddr)^ = win.sockaddr_in {
|
|
sin_port = u16be(win.USHORT(ep.port)),
|
|
sin_addr = transmute(win.in_addr)a,
|
|
sin_family = u16(win.AF_INET),
|
|
}
|
|
return
|
|
case IP6_Address:
|
|
(^win.sockaddr_in6)(&sockaddr)^ = win.sockaddr_in6 {
|
|
sin6_port = u16be(win.USHORT(ep.port)),
|
|
sin6_addr = transmute(win.in6_addr)a,
|
|
sin6_family = u16(win.AF_INET6),
|
|
}
|
|
return
|
|
}
|
|
unreachable()
|
|
}
|
|
|
|
sockaddr_to_endpoint :: proc(native_addr: ^win.SOCKADDR_STORAGE_LH) -> (ep: Endpoint) {
|
|
switch native_addr.ss_family {
|
|
case u16(win.AF_INET):
|
|
addr := cast(^win.sockaddr_in)native_addr
|
|
port := int(addr.sin_port)
|
|
ep = Endpoint {
|
|
address = IP4_Address(transmute([4]byte)addr.sin_addr),
|
|
port = port,
|
|
}
|
|
case u16(win.AF_INET6):
|
|
addr := cast(^win.sockaddr_in6)native_addr
|
|
port := int(addr.sin6_port)
|
|
ep = Endpoint {
|
|
address = IP6_Address(transmute([8]u16be)addr.sin6_addr),
|
|
port = port,
|
|
}
|
|
case:
|
|
panic("native_addr is neither IP4 or IP6 address")
|
|
}
|
|
return
|
|
}
|
|
|
|
load_socket_fn :: proc(subject: win.SOCKET, guid: win.GUID, fn: ^$T) {
|
|
over: win.OVERLAPPED
|
|
|
|
guid := guid
|
|
bytes: u32
|
|
rc := win.WSAIoctl(
|
|
subject,
|
|
win.SIO_GET_EXTENSION_FUNCTION_POINTER,
|
|
&guid,
|
|
size_of(guid),
|
|
fn,
|
|
size_of(fn),
|
|
&bytes,
|
|
// NOTE: I don't think loading a socket fn ever blocks,
|
|
// but I would like to hit an assert if it does, so we do pass it.
|
|
&over,
|
|
nil,
|
|
)
|
|
assert(rc != win.SOCKET_ERROR)
|
|
assert(bytes == size_of(fn^))
|
|
}
|
|
|
|
check_timed_out :: proc(op: ^Operation, expires: time.Time) -> bool {
|
|
return expires != {} && time.diff(op.l.now, expires) <= 0
|
|
}
|
|
|
|
timeouts_cmp :: #force_inline proc(a, b: ^Operation) -> slice.Ordering {
|
|
switch {
|
|
case a.timeout._impl.expires._nsec < b.timeout._impl.expires._nsec:
|
|
return .Less
|
|
case a.timeout._impl.expires._nsec > b.timeout._impl.expires._nsec:
|
|
return .Greater
|
|
case uintptr(a) < uintptr(b):
|
|
return .Less
|
|
case uintptr(a) > uintptr(b):
|
|
return .Greater
|
|
case:
|
|
assert(a == b)
|
|
return .Equal
|
|
}
|
|
}
|