#+private file package nbio import "base:intrinsics" import "core:container/pool" import "core:container/queue" import "core:mem" import "core:net" import "core:strings" import "core:sys/linux" import "core:sys/linux/uring" import "core:time" @(private="package") _FULLY_SUPPORTED :: true @(private="package") _Event_Loop :: struct { ring: uring.Ring, // Ready to be submitted to kernel, if kernel is full. unqueued: queue.Queue(^Operation), // Ready to run callbacks, mainly next tick, some other ops that error outside the kernel. completed: queue.Queue(^Operation), wake: ^Operation, } @(private="package") _Handle :: linux.Fd @(private="package") _CWD :: linux.AT_FDCWD @(private="package") MAX_RW :: mem.Gigabyte @(private="package") _Operation :: struct { removal: ^Operation, sqe: ^linux.IO_Uring_SQE, expires: linux.Time_Spec, } @(private="package") _Accept :: struct { sockaddr: linux.Sock_Addr_Any, sockaddr_len: i32, } @(private="package") _Close :: struct {} @(private="package") _Dial :: struct { sockaddr: linux.Sock_Addr_Any, } @(private="package") _Read :: struct {} @(private="package") _Write :: struct {} Sock_Addr_Ip :: struct #raw_union { using _: struct { family: linux.Address_Family, port: u16be, }, using ipv4: linux.Sock_Addr_In, using ipv6: linux.Sock_Addr_In6, } @(private="package") _Send :: struct { endpoint: Sock_Addr_Ip, msghdr: linux.Msg_Hdr, bufs: Bufs, } @(private="package") _Recv :: struct { addr_out: Sock_Addr_Ip, msghdr: linux.Msg_Hdr, bufs: Bufs, } @(private="package") _Timeout :: struct { expires: linux.Time_Spec, } @(private="package") _Poll :: struct {} @(private="package") _Remove :: struct { target: ^Operation, } @(private="package") _Link_Timeout :: struct { target: ^Operation, expires: linux.Time_Spec, } @(private="package") _Send_File :: struct { len: int, pipe: Handle, splice: ^Operation, } @(private="package") _Splice :: struct { off: int, len: int, file: Handle, pipe: Handle, written: int, sendfile: ^Operation, } @(private="package") _Open :: struct { cpath: cstring, } @(private="package") _Stat :: struct { buf: linux.Statx, } @(private="package") _init :: proc(l: ^Event_Loop, alloc: mem.Allocator) -> (err: General_Error) { params := uring.DEFAULT_PARAMS params.flags += {.SUBMIT_ALL, .COOP_TASKRUN, .SINGLE_ISSUER} uerr := uring.init(&l.ring, ¶ms, QUEUE_SIZE) if uerr != nil { err = General_Error(uerr) return } defer if err != nil { uring.destroy(&l.ring) } if perr := queue.init(&l.unqueued, allocator = alloc); perr != nil { err = .Allocation_Failed return } defer if err != nil { queue.destroy(&l.unqueued) } if perr := queue.init(&l.completed, allocator = alloc); perr != nil { err = .Allocation_Failed return } defer if err != nil { queue.destroy(&l.completed) } set_up_wake_up(l) or_return return set_up_wake_up :: proc(l: ^Event_Loop) -> General_Error { wakefd, wakefd_err := linux.eventfd(0, {.SEMAPHORE, .CLOEXEC, .NONBLOCK}) if wakefd_err != nil { return General_Error(wakefd_err) } op, alloc_err := new(Operation, l.allocator) if alloc_err != nil { linux.close(wakefd) return .Allocation_Failed } l.wake = op l.wake.detached = true l.wake.l = l l.wake.type = .Read l.wake.cb = wake_up_callback l.wake.read.handle = wakefd l.wake.read.buf = ([^]byte)(&l.wake.user_data)[:8] _exec(l.wake) return nil } wake_up_callback :: proc(op: ^Operation) { assert(op.type == .Read) assert(op == op.l.wake) assert(op.read.err == nil) assert(op.read.read == 8) value := intrinsics.unaligned_load((^u64)(&op.user_data)) assert(value > 0) debug(int(value), "wake_up calls handled") op.read.read = 0 op.user_data = {} _exec(op) } } @(private="package") _destroy :: proc(l: ^Event_Loop) { linux.close(l.wake.read.handle) free(l.wake, l.allocator) queue.destroy(&l.unqueued) queue.destroy(&l.completed) uring.destroy(&l.ring) } @(private="package") __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> General_Error { debug("tick") // Execute completed operations, mostly next tick ops, also some other ops that may error before // adding it to the Uring. n := queue.len(l.completed) if n > 0 { l.now = time.now() for _ in 0 ..< n { completed := queue.pop_front(&l.completed) if completed._impl.removal == nil { completed.cb(completed) } else if completed._impl.removal != (^Operation)(REMOVED) { completed._impl.removal._remove.target = nil } if !completed.detached { pool.put(&l.operation_pool, completed) } } } err := _flush_submissions(l, timeout) if err != nil { return General_Error(err) } l.now = time.now() err = _flush_completions(l, false) if err != nil { return General_Error(err) } return nil _flush_completions :: proc(l: ^Event_Loop, wait: bool) -> linux.Errno { wait := wait cqes: [128]linux.IO_Uring_CQE = --- for { completed, err := uring.copy_cqes(&l.ring, cqes[:], 1 if wait else 0) if err == .EINTR { continue } else if err != nil { return err } _flush_unqueued(l) if completed > 0 { debug(int(completed), "operations returned from uring") } for cqe in cqes[:completed] { assert(cqe.user_data != 0) op, is_timeout := unpack_operation(cqe.user_data) if is_timeout { link_timeout_callback(op, cqe.res) } else { handle_completed(op, cqe.res) } } if completed < len(cqes) { break } debug("more events ready than our results buffer handles, getting more") wait = false } return nil } _flush_submissions :: proc(l: ^Event_Loop, timeout: time.Duration) -> linux.Errno { for { ts: linux.Time_Spec ts.time_nsec = uint(timeout) _, err := uring.submit(&l.ring, 0 if timeout == 0 else 1, nil if timeout < 0 else &ts) #partial switch err { case .NONE, .ETIME: case .EINTR: warn("uring interrupted") continue case .ENOMEM: // It's full, wait for at least one operation to complete and try again. warn("could not flush submissions, ENOMEM, waiting for operations to complete before continuing") ferr := _flush_completions(l, true) if ferr != nil { return ferr } continue case: return err } break } return nil } _flush_unqueued :: proc(l: ^Event_Loop) { n := queue.len(l.unqueued) for _ in 0.. (handle: Handle, err: FS_Error) { if path == "" { err = .Invalid_Argument return } cpath, cerr := strings.clone_to_cstring(path, l.allocator) if cerr != nil { err = .Allocation_Failed return } defer delete(cpath, l.allocator) sys_flags := linux.Open_Flags{.NOCTTY, .CLOEXEC, .NONBLOCK} if .Write in mode { if .Read in mode { sys_flags += {.RDWR} } else { sys_flags += {.WRONLY} } } if .Append in mode { sys_flags += {.APPEND} } if .Create in mode { sys_flags += {.CREAT} } if .Excl in mode { sys_flags += {.EXCL} } if .Sync in mode { sys_flags += {.DSYNC} } if .Trunc in mode { sys_flags += {.TRUNC} } // if .Inheritable in mode { sys_flags -= {.CLOEXEC} } errno: linux.Errno handle, errno = linux.openat(dir, cpath, sys_flags, transmute(linux.Mode)perm) if errno != nil { err = FS_Error(errno) } return } @(private="package") _create_socket :: proc( _: ^Event_Loop, family: Address_Family, protocol: Socket_Protocol, ) -> ( socket: Any_Socket, err: Create_Socket_Error, ) { socket = net.create_socket(family, protocol) or_return // NOTE: this doesn't seem needed with io uring. // defer if err != nil { net.close(socket) } // net.set_blocking(socket, false) or_return return } @(private="package") _listen :: proc(socket: TCP_Socket, backlog := 1000) -> Listen_Error { err := linux.listen(linux.Fd(socket), i32(backlog)) if err != nil { return net._listen_error(err) } return nil } @(private="package") _remove :: proc(target: ^Operation) { target := target assert(target != nil) if target._impl.removal != nil { return } op := _prep(target.l, proc(_: ^Operation) {}, ._Remove) op._remove.target = target target._impl.removal = op enqueue(op, uring.async_cancel( &op.l.ring, u64(uintptr(target)), u64(uintptr(op)), )) } @(private="package") _associate_handle :: proc(handle: uintptr, l: ^Event_Loop) -> (Handle, Association_Error) { // Works by default. return Handle(handle), nil } @(private="package") _associate_socket :: proc(socket: Any_Socket, l: ^Event_Loop) -> Association_Error { // Works by default. return nil } @(private="package") _wake_up :: proc(l: ^Event_Loop) { assert(l != &_tls_event_loop) one: u64 = 1 // Called from another thread, in which we can't use the uring. n, err := linux.write(l.wake.read.handle, ([^]byte)(&one)[:size_of(one)]) // Shouldn't fail. assert(err == nil) assert(n == 8) } @(private="package") _yield :: proc() { linux.sched_yield() } // Start file private. // The size of the IO Uring queues. QUEUE_SIZE :: #config(ODIN_NBIO_QUEUE_SIZE, 2048) #assert(QUEUE_SIZE <= uring.MAX_ENTRIES) #assert(size_of(Operation) <= 384) // Just so we see when we make it bigger. #assert(size_of(Specifics) <= 288) // Just so we see when we make it bigger. REMOVED :: rawptr(max(uintptr)-1) handle_completed :: proc(op: ^Operation, res: i32) { debug("handling", op.type, "result", int(res)) switch op.type { case .Accept: accept_callback(op, res) case .Dial: dial_callback(op, res) case .Timeout: timeout_callback(op, res) case .Write: if !write_callback(op, res) { return } case .Read: if !read_callback(op, res) { return } case .Close: close_callback(op, res) case .Poll: poll_callback(op, res) case .Send: if !send_callback(op, res) { return } maybe_callback(op) bufs_delete(&op.send._impl.bufs, op.send.bufs, op.l.allocator) cleanup(op) return case .Recv: if !recv_callback(op, res) { return } maybe_callback(op) bufs_delete(&op.recv._impl.bufs, op.recv.bufs, op.l.allocator) cleanup(op) return case .Open: open_callback(op, res) case .Stat: stat_callback(op, res) case .Send_File: if !sendfile_callback(op, res) { return } case ._Splice: if !splice_callback(op, res) { return } case ._Remove: if !remove_callback(op, res) { return } case ._Link_Timeout: unreachable() case .None: fallthrough case: panic("corrupted operation") } maybe_callback(op) cleanup(op) maybe_callback :: proc(op: ^Operation) { if op._impl.removal == nil { debug("done, calling back", op.type) op.cb(op) } else if op._impl.removal == (^Operation)(REMOVED) { debug("done but was cancelled by remove", op.type) } else { debug("done but has removal pending", op.type) // If the remove callback sees their target is nil, they know it is done already. op._impl.removal._remove.target = nil } } cleanup :: proc(op: ^Operation) { if !op.detached { pool.put(&op.l.operation_pool, op) } } } enqueue :: proc(op: ^Operation, sqe: ^linux.IO_Uring_SQE, ok: bool) { assert(uintptr(op) & LINK_TIMEOUT_MASK == 0) debug("enqueue", op.type) if !ok { warn("queueing for next tick because the ring is full, queue size may need increasing") pok, _ := queue.push_back(&op.l.unqueued, op) ensure(pok, "unqueued queue allocation failure") return } op._impl.sqe = sqe } LINK_TIMEOUT_MASK :: 1 link_timeout :: proc(target: ^Operation, expires: time.Time) { if expires == {} { return } // If the last op was queued because kernel is full, return. if target._impl.sqe == nil { assert(queue.len(target.l.unqueued) > 0 && queue.back_ptr(&target.l.unqueued)^ == target) return } target._impl.sqe.flags += {.IO_LINK} target._impl.expires = ns_to_time_spec(expires._nsec) // Tag the pointer as a timeout. p := uintptr(target) assert(p & LINK_TIMEOUT_MASK == 0) p |= LINK_TIMEOUT_MASK _, ok := uring.link_timeout( &target.l.ring, u64(p), &target._impl.expires, {.ABS, .REALTIME}, ) // If the target wasn't queued, the link timeout should not need to be queued, because uring // leaves one spot specifically for a link. assert(ok) } link_timeout_callback :: proc(op: ^Operation, res: i32) { err := linux.Errno(-res) if err != nil && err != .ETIME && err != .ECANCELED { panic("unexpected nbio.link_timeout() error") } } unpack_operation :: #force_inline proc(user_data: u64) -> (op: ^Operation, timed_out: bool) { p := uintptr(user_data) return (^Operation)(p &~ LINK_TIMEOUT_MASK), bool(p & LINK_TIMEOUT_MASK) } @(require_results) remove_callback :: proc(op: ^Operation, res: i32) -> bool { assert(op.type == ._Remove) err := linux.Errno(-res) target := op._remove.target if target == nil { debug("remove target nil, already handled") return true } assert(target.type != .None) assert(target._impl.removal == op) if err == .ENOENT { debug("remove ENOENT, trying again") enqueue(op, uring.async_cancel( &op.l.ring, u64(uintptr(target)), u64(uintptr(op)), )) return false } else if err == .EALREADY { debug("remove is accepted and will be tried") } else if err != nil { assert(false, "unexpected nbio.remove() error") } // Set to sentinel so nothing references the operation that will be reused. target._impl.removal = (^Operation)(REMOVED) return true } accept_exec :: proc(op: ^Operation) { assert(op.type == .Accept) op.accept._impl.sockaddr_len = size_of(op.accept._impl.sockaddr) enqueue(op, uring.accept( &op.l.ring, u64(uintptr(op)), linux.Fd(op.accept.socket), &op.accept._impl.sockaddr, &op.accept._impl.sockaddr_len, {}, )) link_timeout(op, op.accept.expires) } accept_callback :: proc(op: ^Operation, res: i32) { assert(op.type == .Accept) if res < 0 { errno := linux.Errno(-res) #partial switch errno { case .ECANCELED: op.accept.err = .Timeout case: op.accept.err = net._accept_error(errno) } return } op.accept.client = TCP_Socket(res) // net.set_blocking(net.TCP_Socket(op.accept.client), false) op.accept.client_endpoint = sockaddr_storage_to_endpoint_any(&op.accept._impl.sockaddr) } dial_exec :: proc(op: ^Operation) { assert(op.type == .Dial) if op.dial.socket == {} { if op.dial.endpoint.port == 0 { op.dial.err = .Port_Required queue.push_back(&op.l.completed, op) return } sock, err := create_socket(net.family_from_endpoint(op.dial.endpoint), .TCP) if err != nil { op.dial.err = err queue.push_back(&op.l.completed, op) return } op.dial.socket = sock.(TCP_Socket) op.dial._impl.sockaddr = endpoint_to_sockaddr_any(op.dial.endpoint) } enqueue(op, uring.connect( &op.l.ring, u64(uintptr(op)), linux.Fd(op.dial.socket), &op.dial._impl.sockaddr, )) link_timeout(op, op.dial.expires) } dial_callback :: proc(op: ^Operation, res: i32) { assert(op.type == .Dial) errno := linux.Errno(-res) if errno != nil { #partial switch errno { case .ECANCELED: op.dial.err = Dial_Error.Timeout case: op.dial.err = net._dial_error(errno) } close(op.dial.socket) } } timeout_exec :: proc(op: ^Operation) { assert(op.type == .Timeout) if op.timeout.duration <= 0 { queue.push_back(&op.l.completed, op) return } expires := time.time_add(op.l.now, op.timeout.duration) op.timeout._impl.expires = ns_to_time_spec(expires._nsec) enqueue(op, uring.timeout( &op.l.ring, u64(uintptr(op)), &op.timeout._impl.expires, 0, {.ABS, .REALTIME}, )) } timeout_callback :: proc(op: ^Operation, res: i32) { if res < 0 { errno := linux.Errno(-res) #partial switch errno { case .ETIME, .ECANCELED: // OK. case: debug("unexpected timeout error:", int(errno)) panic("unexpected timeout error") } } } close_exec :: proc(op: ^Operation) { assert(op.type == .Close) fd: linux.Fd switch closable in op.close.subject { case Handle: fd = linux.Fd(closable) case TCP_Socket: fd = linux.Fd(closable) case UDP_Socket: fd = linux.Fd(closable) case: op.close.err = .Invalid_Argument; return } enqueue(op, uring.close( &op.l.ring, u64(uintptr(op)), fd, )) } close_callback :: proc(op: ^Operation, res: i32) { assert(op.type == .Close) op.close.err = FS_Error(linux.Errno(-res)) } recv_exec :: proc(op: ^Operation) { assert(op.type == .Recv) if op.recv.err != nil { queue.push_back(&op.l.completed, op) return } bufs, _ := bufs_to_process(&op.recv._impl.bufs, op.recv.bufs, op.recv.received) op.recv._impl.msghdr.iov = transmute([]linux.IO_Vec)bufs sock: linux.Fd switch socket in op.recv.socket { case TCP_Socket: sock = linux.Fd(socket) case UDP_Socket: sock = linux.Fd(socket) op.recv._impl.msghdr.name = &op.recv._impl.addr_out op.recv._impl.msghdr.namelen = size_of(op.recv._impl.addr_out) } enqueue(op, uring.recvmsg( &op.l.ring, u64(uintptr(op)), linux.Fd(sock), &op.recv._impl.msghdr, {.NOSIGNAL}, )) link_timeout(op, op.recv.expires) } @(require_results) recv_callback :: proc(op: ^Operation, res: i32) -> bool { assert(op.type == .Recv) if res < 0 { errno := linux.Errno(-res) switch sock in op.recv.socket { case TCP_Socket: #partial switch errno { case .ECANCELED: op.recv.err = TCP_Recv_Error.Timeout case: op.recv.err = net._tcp_recv_error(errno) } case UDP_Socket: #partial switch errno { case .ECANCELED: op.recv.err = UDP_Recv_Error.Timeout case: op.recv.err = net._udp_recv_error(errno) } } return true } op.recv.received += int(res) switch sock in op.recv.socket { case TCP_Socket: if res == 0 { // Connection closed. return true } if op.recv.all { total: int for buf in op.recv.bufs { total += len(buf) } if op.recv.received < total { recv_exec(op) return false } } case UDP_Socket: op.recv.source = sockaddr_storage_to_endpoint_ip(&op.recv._impl.addr_out) } return true } send_exec :: proc(op: ^Operation) { assert(op.type == .Send) if op.send.err != nil { queue.push_back(&op.l.completed, op) return } bufs, _ := bufs_to_process(&op.send._impl.bufs, op.send.bufs, op.send.sent) op.send._impl.msghdr.iov = transmute([]linux.IO_Vec)bufs sock: linux.Fd switch socket in op.send.socket { case TCP_Socket: sock = linux.Fd(socket) case UDP_Socket: sock = linux.Fd(socket) op.send._impl.endpoint = endpoint_to_sockaddr_ip(op.send.endpoint) op.send._impl.msghdr.name = &op.send._impl.endpoint op.send._impl.msghdr.namelen = size_of(op.send._impl.endpoint) } enqueue(op, uring.sendmsg( &op.l.ring, u64(uintptr(op)), sock, &op.send._impl.msghdr, {.NOSIGNAL}, )) link_timeout(op, op.send.expires) } @(require_results) send_callback :: proc(op: ^Operation, res: i32) -> bool { assert(op.type == .Send) if res < 0 { errno := linux.Errno(-res) switch sock in op.send.socket { case TCP_Socket: #partial switch errno { case .ECANCELED: op.send.err = TCP_Send_Error.Timeout case: op.send.err = net._tcp_send_error(errno) } case UDP_Socket: #partial switch errno { case .ECANCELED: op.send.err = UDP_Send_Error.Timeout case: op.send.err = net._udp_send_error(errno) } case: panic("corrupted socket") } return true } op.send.sent += int(res) if op.send.all { total: int for buf in op.send.bufs { total += len(buf) } if op.send.sent < total { assert(res > 0) send_exec(op) return false } } return true } write_exec :: proc(op: ^Operation) { assert(op.type == .Write) buf := op.write.buf[op.write.written:] buf = buf[:min(MAX_RW, len(buf))] enqueue(op, uring.write( &op.l.ring, u64(uintptr(op)), op.write.handle, buf, u64(op.write.offset) + u64(op.write.written), )) link_timeout(op, op.write.expires) } @(require_results) write_callback :: proc(op: ^Operation, res: i32) -> bool { if res < 0 { errno := linux.Errno(-res) op.write.err = FS_Error(errno) return true } op.write.written += int(res) if op.write.all && op.write.written < len(op.write.buf) { write_exec(op) return false } return true } read_exec :: proc(op: ^Operation) { assert(op.type == .Read) buf := op.read.buf[op.read.read:] buf = buf[:min(MAX_RW, len(buf))] enqueue(op, uring.read( &op.l.ring, u64(uintptr(op)), op.read.handle, buf, u64(op.read.offset) + u64(op.read.read), )) link_timeout(op, op.read.expires) } @(require_results) read_callback :: proc(op: ^Operation, res: i32) -> bool { if res < 0 { errno := linux.Errno(-res) op.read.err = FS_Error(errno) return true } else if res == 0 { if op.read.read == 0 { op.read.err = .EOF } return true } op.read.read += int(res) if op.read.all && op.read.read < len(op.read.buf) { read_exec(op) return false } return true } poll_exec :: proc(op: ^Operation) { assert(op.type == .Poll) events: linux.Fd_Poll_Events switch op.poll.event { case .Receive: events = { .IN } case .Send: events = { .OUT } } fd: linux.Fd switch sock in op.poll.socket { case TCP_Socket: fd = linux.Fd(sock) case UDP_Socket: fd = linux.Fd(sock) } enqueue(op, uring.poll_add( &op.l.ring, u64(uintptr(op)), fd, events, {}, )) link_timeout(op, op.poll.expires) } poll_callback :: proc(op: ^Operation, res: i32) { if res < 0 { errno := linux.Errno(-res) #partial switch errno { case .NONE: // no-op case .ECANCELED: op.poll.result = .Timeout case .EINVAL, .EFAULT, .EBADF: op.poll.result = .Invalid_Argument case: op.poll.result = .Error } return } op.poll.result = .Ready } /* `sendfile` is implemented with 2 splices over a pipe. Splice A: from file to pipe Splice B: from pipe to socket (optionally linked to a timeout) The splices are hard-linked which means A completes before B. B could get an `EWOULDBLOCK`, which is when the remote end has not read enough of the socket data yet. In that case we enqueue a poll on the socket and continue when that completes. A shouldn't get `EWOULDBLOCK`, but as a cautionary measure we handle it. The timeout is either linked to the splice B op, or the poll op, either of these is also always in progress in the kernel. */ sendfile_exec :: proc(op: ^Operation, splice := true) { assert(op.type == .Send_File) splice_done := !splice if splice_op := op.sendfile._impl.splice; splice && splice_op != nil { splice_done = splice_op._splice.written == splice_op._splice.len } debug("sendfile_exec") if op.sendfile._impl.splice == nil { // First stat for the file size. if op.sendfile.nbytes == SEND_ENTIRE_FILE { debug("sendfile SEND_ENTIRE_FILE, doing stat") stat_poly(op.sendfile.file, op, proc(stat_op: ^Operation, sendfile_op: ^Operation) { if stat_op.stat.err != nil { sendfile_op.sendfile.err = stat_op.stat.err } else if stat_op.stat.type != .Regular { sendfile_op.sendfile.err = FS_Error.Invalid_Argument } else { sendfile_op.sendfile.nbytes = int(i64(stat_op.stat.size) - i64(sendfile_op.sendfile.offset)) if sendfile_op.sendfile.nbytes <= 0 { sendfile_op.sendfile.err = FS_Error.Invalid_Argument } } if sendfile_op.sendfile.err != nil { handle_completed(sendfile_op, 0) return } sendfile_exec(sendfile_op) }) return } debug("sendfile setting up") rw: [2]linux.Fd err := linux.pipe2(&rw, {.NONBLOCK, .CLOEXEC}) if err != nil { op.sendfile.err = FS_Error(err) queue.push_back(&op.l.completed, op) return } splice_op := _prep(op.l, proc(_: ^Operation) { debug("sendfile splice helper callback") }, ._Splice) splice_op._splice.sendfile = op splice_op._splice.file = op.sendfile.file splice_op._splice.pipe = rw[1] splice_op._splice.off = op.sendfile.offset splice_op._splice.len = op.sendfile.nbytes op.sendfile._impl.splice = splice_op op.sendfile._impl.pipe = rw[0] op.sendfile._impl.len = op.sendfile.nbytes } splice_op: ^Operation if !splice_done { splice_op = op.sendfile._impl.splice enqueue(splice_op, uring.splice( &splice_op.l.ring, u64(uintptr(splice_op)), splice_op._splice.file, i64(splice_op._splice.off) + i64(splice_op._splice.written), splice_op._splice.pipe, -1, u32(min(splice_op._splice.len - splice_op._splice.written, MAX_RW)), {.NONBLOCK}, )) } b, b_added := uring.splice( &op.l.ring, u64(uintptr(op)), op.sendfile._impl.pipe, -1, linux.Fd(op.sendfile.socket), -1, u32(min(op.sendfile._impl.len - op.sendfile.sent, MAX_RW)), {.NONBLOCK}, ) if !splice_done && b_added { assert(splice_op._impl.sqe != nil) // if b was added successfully, a should've been too. // Makes sure splice A (file to pipe) completes before splice B (pipe to socket). splice_op._impl.sqe.flags += {.IO_HARDLINK} } enqueue(op, b, b_added) link_timeout(op, op.sendfile.expires) } @(require_results) splice_callback :: proc(op: ^Operation, res: i32) -> bool { assert(op.type == ._Splice) if res < 0 { errno := linux.Errno(-res) #partial switch errno { case .EAGAIN: // Splice A (from file to pipe) would block, this means the buffer is full and it first needs // to be sent over the socket by splice B (from pipe to socket). // So we don't do anything here, once a splice B completes a new splice A will be created. case: // Splice A (from file to pipe) error, we need to close the pipes, cancel the pending splice B, // and call the callback with the error. debug("sendfile helper splice error, closing pipe") close(op._splice.pipe) // This is nil if this is a cancel originating from the sendfile. // This is not nil if it is an actual error that happened on this splice. sendfile_op := op._splice.sendfile if sendfile_op != nil { debug("sendfile helper splice error, cancelling main sendfile") assert(sendfile_op.type == .Send_File) sendfile_op.sendfile._impl.splice = nil sendfile_op.sendfile.err = FS_Error(errno) } } return true } op._splice.written += int(res) sendfile_op := op._splice.sendfile if sendfile_op != nil { if op._splice.written < sendfile_op.sendfile.nbytes { return false } sendfile_op.sendfile._impl.splice = nil } assert(op._splice.pipe > 0) close(op._splice.pipe) debug("sendfile helper splice completely done") return true } @(require_results) sendfile_callback :: proc(op: ^Operation, res: i32) -> bool { assert(op.type == .Send_File) if op.sendfile.err == nil && res < 0 { errno := linux.Errno(-res) #partial switch errno { case .EAGAIN: // Splice B (from pipe to socket) would block. We are waiting on the remote to read more // of our buffer before we can send more to it. // We use a poll to find out when this is. debug("sendfile needs to poll") poll_op := poll_poly(op.sendfile.socket, .Send, op, proc(poll_op: ^Operation, sendfile_op: ^Operation) { #partial switch poll_op.poll.result { case .Ready: // Do not enqueue a splice right away, we know there is at least one splice call worth of data in the kernel buffer. sendfile_exec(sendfile_op, splice=false) return case .Timeout: sendfile_op.sendfile.err = TCP_Send_Error.Timeout case: sendfile_op.sendfile.err = TCP_Send_Error.Unknown } debug("sendfile poll error") handle_completed(sendfile_op, 0) }) link_timeout(poll_op, op.sendfile.expires) return false case .ECANCELED: op.sendfile.err = TCP_Send_Error.Timeout case: op.sendfile.err = net._tcp_send_error(errno) } } if op.sendfile.err != nil { debug("sendfile error") if op.sendfile._impl.pipe > 0 { close(op.sendfile._impl.pipe) } splice_op := op.sendfile._impl.splice if splice_op != nil { assert(splice_op.type == ._Splice) splice_op._splice.sendfile = nil _remove(splice_op) } return true } op.sendfile.sent += int(res) if op.sendfile.sent < op.sendfile._impl.len { debug("sendfile not completely done yet") sendfile_exec(op) if op.sendfile.progress_updates { op.cb(op) } return false } debug("sendfile completely done") return true } open_exec :: proc(op: ^Operation) { assert(op.type == .Open) sys_flags := linux.Open_Flags{.NOCTTY, .CLOEXEC, .NONBLOCK} if .Write in op.open.mode { if .Read in op.open.mode { sys_flags += {.RDWR} } else { sys_flags += {.WRONLY} } } if .Append in op.open.mode { sys_flags += {.APPEND} } if .Create in op.open.mode { sys_flags += {.CREAT} } if .Excl in op.open.mode { sys_flags += {.EXCL} } if .Sync in op.open.mode { sys_flags += {.DSYNC} } if .Trunc in op.open.mode { sys_flags += {.TRUNC} } // if .Inheritable in op.open.mode { sys_flags -= {.CLOEXEC} } cpath, err := strings.clone_to_cstring(op.open.path, op.l.allocator) if err != nil { op.open.err = .Allocation_Failed queue.push_back(&op.l.completed, op) return } op.open._impl.cpath = cpath enqueue(op, uring.openat( &op.l.ring, u64(uintptr(op)), linux.Fd(op.open.dir), op.open._impl.cpath, transmute(linux.Mode)op.open.perm, sys_flags, )) } open_callback :: proc(op: ^Operation, res: i32) { assert(op.type == .Open) delete(op.open._impl.cpath, op.l.allocator) if res < 0 { errno := linux.Errno(-res) op.open.err = FS_Error(errno) return } op.open.handle = Handle(res) } stat_exec :: proc(op: ^Operation) { assert(op.type == .Stat) enqueue(op, uring.statx( &op.l.ring, u64(uintptr(op)), op.stat.handle, "", {.EMPTY_PATH}, {.TYPE, .SIZE}, &op.stat._impl.buf, )) } stat_callback :: proc(op: ^Operation, res: i32) { assert(op.type == .Stat) if res < 0 { errno := linux.Errno(-res) op.stat.err = FS_Error(errno) return } type := File_Type.Regular switch op.stat._impl.buf.mode & linux.S_IFMT { case linux.S_IFBLK, linux.S_IFCHR: type = .Device case linux.S_IFDIR: type = .Directory case linux.S_IFIFO: type = .Pipe_Or_Socket case linux.S_IFLNK: type = .Symlink case linux.S_IFREG: type = .Regular case linux.S_IFSOCK: type = .Pipe_Or_Socket } op.stat.type = type op.stat.size = i64(op.stat._impl.buf.size) } @(require_results) sockaddr_storage_to_endpoint_ip :: proc(addr: ^Sock_Addr_Ip) -> (ep: Endpoint) { #partial switch addr.family { case .INET: return Endpoint { address = IP4_Address(addr.sin_addr), port = int(addr.sin_port), } case .INET6: return Endpoint { address = IP6_Address(transmute([8]u16be)addr.sin6_addr), port = int(addr.sin6_port), } case: return {} } } @(require_results) sockaddr_storage_to_endpoint_any :: proc(addr: ^linux.Sock_Addr_Any) -> (ep: Endpoint) { #partial switch addr.family { case .INET: return Endpoint { address = IP4_Address(addr.sin_addr), port = int(addr.sin_port), } case .INET6: return Endpoint { address = IP6_Address(transmute([8]u16be)addr.sin6_addr), port = int(addr.sin6_port), } case: return {} } } @(require_results) endpoint_to_sockaddr_ip :: proc(ep: Endpoint) -> (sockaddr: Sock_Addr_Ip) { switch a in ep.address { case IP4_Address: sockaddr.sin_family = .INET sockaddr.sin_port = u16be(ep.port) sockaddr.sin_addr = cast([4]u8)a return case IP6_Address: sockaddr.sin6_family = .INET6 sockaddr.sin6_port = u16be(ep.port) sockaddr.sin6_addr = transmute([16]u8)a return } unreachable() } @(require_results) endpoint_to_sockaddr_any :: proc(ep: Endpoint) -> (sockaddr: linux.Sock_Addr_Any) { switch a in ep.address { case IP4_Address: sockaddr.sin_family = .INET sockaddr.sin_port = u16be(ep.port) sockaddr.sin_addr = cast([4]u8)a return case IP6_Address: sockaddr.sin6_family = .INET6 sockaddr.sin6_port = u16be(ep.port) sockaddr.sin6_addr = transmute([16]u8)a return } unreachable() } @(require_results) ns_to_time_spec :: proc(nsec: i64) -> linux.Time_Spec { NANOSECONDS_PER_SECOND :: 1e9 return { time_sec = uint(nsec / NANOSECONDS_PER_SECOND), time_nsec = uint(nsec % NANOSECONDS_PER_SECOND), } }