From 334a554de314ff1926dea35e9045ef7796f1414b Mon Sep 17 00:00:00 2001 From: andzdroid Date: Thu, 26 Feb 2026 00:30:27 +0000 Subject: [PATCH 1/2] nbio: fix op re-use --- core/nbio/impl_posix.odin | 2 ++ tests/core/nbio/nbio.odin | 75 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/core/nbio/impl_posix.odin b/core/nbio/impl_posix.odin index f76cf2849..5a16efbf4 100644 --- a/core/nbio/impl_posix.odin +++ b/core/nbio/impl_posix.odin @@ -1154,6 +1154,8 @@ stat_exec :: proc(op: ^Operation) { add_pending :: proc(op: ^Operation, filter: kq.Filter, ident: uintptr) { debug("adding pending", op.type) + op._impl.next = nil + op._impl.prev = nil op._impl.flags += {.For_Kernel} _, val, just_inserted, err := map_entry(&op.l.submitted, Queue_Identifier{ ident = ident, filter = filter }) diff --git a/tests/core/nbio/nbio.odin b/tests/core/nbio/nbio.odin index 6c3fd0e8c..2fbfc8894 100644 --- a/tests/core/nbio/nbio.odin +++ b/tests/core/nbio/nbio.odin @@ -256,3 +256,78 @@ wake_up :: proc(t: ^testing.T) { } } } + +@(test) +reused_ops :: proc(t: ^testing.T) { + if event_loop_guard(t) { + testing.set_fail_timeout(t, time.Minute) + + @static payload := [4]byte{'P', 'I', 'N', 'G'} + sock, ep := open_next_available_local_port(t) + + State :: struct { + listener: nbio.TCP_Socket, + accepts: [4]^nbio.Operation, + client: nbio.TCP_Socket, + server_client: nbio.TCP_Socket, + recv_buf: [4]byte, + recv_done: bool, + } + + state := State{ + listener = sock, + } + + on_accept :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State, slot: int) { + if state.recv_done { + nbio.close(op.accept.client) + return + } + + ev(t, op.accept.err, nil) + if state.server_client == 0 { + state.server_client = op.accept.client + nbio.recv_poly2(state.server_client, {state.recv_buf[:]}, t, state, on_recv, timeout=0) + } + + state.accepts[slot] = nbio.accept_poly3(op.accept.socket, t, state, slot, on_accept, timeout=nbio.NO_TIMEOUT) + } + + on_recv :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { + ev(t, op.recv.err, nil) + ev(t, op.recv.received, 4) + ev(t, state.recv_buf, payload) + state.recv_done = true + + for accept in state.accepts { + if accept != nil { + nbio.remove(accept) + } + } + + nbio.close(state.server_client) + nbio.close(state.client) + nbio.close(state.listener) + } + + on_dial :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { + ev(t, op.dial.err, nil) + state.client = op.dial.socket + nbio.send_poly2(state.client, {payload[:]}, t, state, on_send) + } + + on_send :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { + ev(t, op.send.err, nil) + ev(t, op.send.sent, (4)) + } + + for slot in 0 ..< 4 { + state.accepts[slot] = nbio.accept_poly3(sock, t, &state, slot, on_accept, timeout=nbio.NO_TIMEOUT) + } + ev(t, nbio.tick(0), nil) + + nbio.dial_poly2(ep, t, &state, on_dial) + ev(t, nbio.run(), nil) + e(t, state.recv_done) + } + } From f61a216c9ff47a15bfb0538c40996c76fe8c7dd9 Mon Sep 17 00:00:00 2001 From: Laytan Laats Date: Fri, 27 Feb 2026 21:04:43 +0100 Subject: [PATCH 2/2] nbio: put clearing of list nodes in proper place and simplify test --- core/nbio/impl_posix.odin | 10 ++++- tests/core/nbio/nbio.odin | 78 +++++++++++++-------------------------- 2 files changed, 33 insertions(+), 55 deletions(-) diff --git a/core/nbio/impl_posix.odin b/core/nbio/impl_posix.odin index 5a16efbf4..8469c9ade 100644 --- a/core/nbio/impl_posix.odin +++ b/core/nbio/impl_posix.odin @@ -336,6 +336,12 @@ __tick :: proc(l: ^Event_Loop, timeout: time.Duration) -> General_Error { if .Error in event.flags { curr._impl.flags += {.Error} } if .EOF in event.flags { curr._impl.flags += {.EOF} } curr._impl.result = event.data + + // Remove refs to the list in case the operation would still block and `add_pending` + // is executed on it again. + curr._impl.prev = nil + curr._impl.next = nil + handle_completed(curr) } } @@ -1154,8 +1160,8 @@ stat_exec :: proc(op: ^Operation) { add_pending :: proc(op: ^Operation, filter: kq.Filter, ident: uintptr) { debug("adding pending", op.type) - op._impl.next = nil - op._impl.prev = nil + assert(op._impl.next == nil) + assert(op._impl.prev == nil) op._impl.flags += {.For_Kernel} _, val, just_inserted, err := map_entry(&op.l.submitted, Queue_Identifier{ ident = ident, filter = filter }) diff --git a/tests/core/nbio/nbio.odin b/tests/core/nbio/nbio.odin index 2fbfc8894..6121d1ac7 100644 --- a/tests/core/nbio/nbio.odin +++ b/tests/core/nbio/nbio.odin @@ -257,77 +257,49 @@ wake_up :: proc(t: ^testing.T) { } } +// Tests that if multiple accepts are queued, and a dial comes in which completes one of them, +// the rest are queued again properly. @(test) -reused_ops :: proc(t: ^testing.T) { +still_pending :: proc(t: ^testing.T) { if event_loop_guard(t) { testing.set_fail_timeout(t, time.Minute) - @static payload := [4]byte{'P', 'I', 'N', 'G'} sock, ep := open_next_available_local_port(t) + defer nbio.close(sock) + + N :: 3 State :: struct { - listener: nbio.TCP_Socket, - accepts: [4]^nbio.Operation, - client: nbio.TCP_Socket, - server_client: nbio.TCP_Socket, - recv_buf: [4]byte, - recv_done: bool, + accepted: int, } + state: State - state := State{ - listener = sock, - } - - on_accept :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State, slot: int) { - if state.recv_done { - nbio.close(op.accept.client) - return - } - + on_accept :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { ev(t, op.accept.err, nil) - if state.server_client == 0 { - state.server_client = op.accept.client - nbio.recv_poly2(state.server_client, {state.recv_buf[:]}, t, state, on_recv, timeout=0) - } - - state.accepts[slot] = nbio.accept_poly3(op.accept.socket, t, state, slot, on_accept, timeout=nbio.NO_TIMEOUT) + state.accepted += 1 + nbio.close(op.accept.client) } - on_recv :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { - ev(t, op.recv.err, nil) - ev(t, op.recv.received, 4) - ev(t, state.recv_buf, payload) - state.recv_done = true - - for accept in state.accepts { - if accept != nil { - nbio.remove(accept) - } - } - - nbio.close(state.server_client) - nbio.close(state.client) - nbio.close(state.listener) - } - - on_dial :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { + on_dial :: proc(op: ^nbio.Operation, t: ^testing.T) { ev(t, op.dial.err, nil) - state.client = op.dial.socket - nbio.send_poly2(state.client, {payload[:]}, t, state, on_send) + nbio.close(op.dial.socket) } - on_send :: proc(op: ^nbio.Operation, t: ^testing.T, state: ^State) { - ev(t, op.send.err, nil) - ev(t, op.send.sent, (4)) + for _ in 0..