From 7f9589922db4f2006287c1e3fc11e5ab308b9de3 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 5 Jun 2025 15:27:24 -0300 Subject: [PATCH 01/10] core/sync.select_raw: return a useful index This fixes a flaw in the original implementation: the returned index is actually useless to the caller. This is because the index returned refers to the internal "candidate" list. This list is dynamic, and may not have all of the input channels (if they weren't ready according to chan.can_{recv,send}). That means the index is not guaranteed to mean anything to the caller. The fix introduced here is to return the index into the input slice (recvs,sends) and an enum to specify which input slice that is. If no selection was made, then (-1, .None) is returned to communicate as much. Signed-off-by: Jack Mordaunt --- core/sync/chan/chan.odin | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index eca4c28d7..610cf16eb 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1105,6 +1105,15 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { return c.w_waiting == 0 } +/* +Specifies the direction of the selected channel. +*/ +Select_Status :: enum { + None, + Recv, + Send, +} + /* Attempts to either send or receive messages on the specified channels. @@ -1170,7 +1179,7 @@ Output: */ @(require_results) -select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, ok: bool) #no_bounds_check { +select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) #no_bounds_check { Select_Op :: struct { idx: int, // local to the slice that was given is_recv: bool, @@ -1204,15 +1213,22 @@ select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: [] return } - select_idx = rand.int_max(count) if count > 0 else 0 + candidate_idx := rand.int_max(count) if count > 0 else 0 - sel := candidates[select_idx] + sel := candidates[candidate_idx] if sel.is_recv { - ok = recv_raw(recvs[sel.idx], recv_out) + status = .Recv + if !recv_raw(recvs[sel.idx], recv_out) { + return -1, .None + } } else { - ok = send_raw(sends[sel.idx], send_msgs[sel.idx]) + status = .Send + if !send_raw(sends[sel.idx], send_msgs[sel.idx]) { + return -1, .None + } } - return + + return sel.idx, status } From be873af003817f7b29ddbd44a4218716d94479dd Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 5 Jun 2025 15:29:10 -0300 Subject: [PATCH 02/10] core/sync.select_raw: rename to try_select_raw This follows the convention where non-blocking operations are prefixed with "try" to indicate as much. Since select_raw in it's current form doesn't block, it should be try_select_raw, and allow select_raw to name a blocking implementation. --- core/sync/chan/chan.odin | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 610cf16eb..df8dea43b 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1116,12 +1116,14 @@ Select_Status :: enum { /* -Attempts to either send or receive messages on the specified channels. +Attempts to either send or receive messages on the specified channels without blocking. `select_raw` first identifies which channels have messages ready to be received and which are available for sending. It then randomly selects one operation (either a send or receive) to perform. +If no channels have messages ready, the procedure is a noop. + Note: Each message in `send_msgs` corresponds to the send channel at the same index in `sends`. **Inputs** @@ -1154,18 +1156,18 @@ Example: // where the value from the read should be stored received_value: int - idx, ok := chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) + idx, ok := chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) fmt.println("RECEIVED VALUE ", received_value) - idx, ok = chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) + idx, ok = chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) fmt.println("RECEIVED VALUE ", received_value) // closing of a channel also affects the select operation chan.close(c) - idx, ok = chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) + idx, ok = chan.try_select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value) fmt.println("SELECT: ", idx, ok) } @@ -1179,7 +1181,7 @@ Output: */ @(require_results) -select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) #no_bounds_check { +try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) #no_bounds_check { Select_Op :: struct { idx: int, // local to the slice that was given is_recv: bool, From d5b7302ac047dd4c5ee9656d405c84268e27b242 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 5 Jun 2025 15:38:04 -0300 Subject: [PATCH 03/10] core/sync.try_select_raw: fix TOCTOU Fixes a TOCTOU where the channel could be used between the call to can_{recv,send} and {recv,send} causing an unexpected blocking operation. To do this we use the non-blocking try_{recv,send} and retry the check in a loop. This guarantees non-blocking select behaviour, at the cost of spinning if the input channels are highly contended. Signed-off-by: Jack Mordaunt --- core/sync/chan/chan.odin | 68 +++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index df8dea43b..d20e7d365 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1189,51 +1189,53 @@ try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs candidate_count := builtin.len(recvs)+builtin.len(sends) candidates := ([^]Select_Op)(intrinsics.alloca(candidate_count*size_of(Select_Op), align_of(Select_Op))) - count := 0 - for c, i in recvs { - if can_recv(c) { - candidates[count] = { - is_recv = true, - idx = i, + for { + count := 0 + + for c, i in recvs { + if can_recv(c) { + candidates[count] = { + is_recv = true, + idx = i, + } + count += 1 } - count += 1 } - } - for c, i in sends { - if can_send(c) { - candidates[count] = { - is_recv = false, - idx = i, + for c, i in sends { + if can_send(c) { + candidates[count] = { + is_recv = false, + idx = i, + } + count += 1 } - count += 1 } - } - if count == 0 { - return - } - - candidate_idx := rand.int_max(count) if count > 0 else 0 - - sel := candidates[candidate_idx] - if sel.is_recv { - status = .Recv - if !recv_raw(recvs[sel.idx], recv_out) { + if count == 0 { return -1, .None } - } else { - status = .Send - if !send_raw(sends[sel.idx], send_msgs[sel.idx]) { - return -1, .None - } - } - return sel.idx, status + candidate_idx := rand.int_max(count) if count > 0 else 0 + + sel := candidates[candidate_idx] + if sel.is_recv { + status = .Recv + if !try_recv_raw(recvs[sel.idx], recv_out) { + continue + } + } else { + status = .Send + if !try_send_raw(sends[sel.idx], send_msgs[sel.idx]) { + continue + } + } + + return sel.idx, status + } } - /* `Raw_Queue` is a non-thread-safe queue implementation designed to store messages of fixed size and alignment. From fb39e5a2f8a051c1df17da199bf46e373c7bcb03 Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Sun, 8 Jun 2025 18:28:35 -0300 Subject: [PATCH 04/10] core/sync/chan.try_select_raw: clarify loop control flow Use a label to clarify the continue statements. --- core/sync/chan/chan.odin | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index d20e7d365..f8f3ac46e 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1125,6 +1125,7 @@ and which are available for sending. It then randomly selects one operation If no channels have messages ready, the procedure is a noop. Note: Each message in `send_msgs` corresponds to the send channel at the same index in `sends`. +If the message is nil, corresponding send channel will be skipped. **Inputs** - `recv`: A slice of channels to read from @@ -1190,7 +1191,7 @@ try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs candidate_count := builtin.len(recvs)+builtin.len(sends) candidates := ([^]Select_Op)(intrinsics.alloca(candidate_count*size_of(Select_Op), align_of(Select_Op))) - for { + try_loop: for { count := 0 for c, i in recvs { @@ -1223,12 +1224,12 @@ try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs if sel.is_recv { status = .Recv if !try_recv_raw(recvs[sel.idx], recv_out) { - continue + continue try_loop } } else { status = .Send if !try_send_raw(sends[sel.idx], send_msgs[sel.idx]) { - continue + continue try_loop } } From 4043be85678e979996286c5269cfa35a7f10edac Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Sun, 8 Jun 2025 18:29:26 -0300 Subject: [PATCH 05/10] core/sync/chan.try_select_raw: skip nil input messages This makes the proc easier and safer to call by letting the caller nil out messages to skip sends. --- core/sync/chan/chan.odin | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index f8f3ac46e..2c34f7bb3 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1205,7 +1205,10 @@ try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs } for c, i in sends { - if can_send(c) { + if i > builtin.len(send_msgs)-1 || send_msgs[i] == nil { + continue + } + if can_send(c) { candidates[count] = { is_recv = false, idx = i, From faae81ba61eb835932dec4458aec9e2c0b04772d Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Sun, 8 Jun 2025 18:31:26 -0300 Subject: [PATCH 06/10] core/sync/chan.try_select_raw: test hook for testing the toctou This is necessary because we need to allow the test guarantee against a rare condition: where a third-party thread steals a value between the validity checks can_{send,recv} and the channel operation try_{send,recv}. --- core/sync/chan/chan.odin | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 2c34f7bb3..17c251158 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -7,6 +7,14 @@ import "core:mem" import "core:sync" import "core:math/rand" +when ODIN_TEST { +/* +Hook for testing _try_select_raw allowing the test harness to manipulate the +channels prior to the select actually operating on them. +*/ +__try_select_raw_pause : proc() = nil +} + /* Determines what operations `Chan` supports. */ @@ -1221,6 +1229,12 @@ try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs return -1, .None } + when ODIN_TEST { + if __try_select_raw_pause != nil { + __try_select_raw_pause() + } + } + candidate_idx := rand.int_max(count) if count > 0 else 0 sel := candidates[candidate_idx] From 4d7c182f7dbb167a49896c74510537033967352a Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Sun, 8 Jun 2025 18:33:13 -0300 Subject: [PATCH 07/10] tests/core/sync/chan: test harness for chan.try_select_raw This test harness ensures consistent non-blocking semantics and validates that we have solved the toctou condition. The __global_context_for_test is a bit of a hack to fuse together the test supplied proc and the executing logic in packaage chan. --- tests/core/sync/chan/test_core_sync_chan.odin | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 9b8d9b354..6bb918516 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -35,6 +35,8 @@ MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second SLEEP_TIME :: 1 * time.Millisecond +__global_context_for_test: rawptr + comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data manual_buffering := data.manual_buffering @@ -272,3 +274,177 @@ test_accept_message_from_closed_buffered_chan :: proc(t: ^testing.T) { testing.expect_value(t, result, 64) testing.expect(t, ok) } + +// Ensures that if any input channel is eligible to receive or send, the try_select_raw +// operation will process it. +@test +test_try_select_raw_happy :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + recv1, recv1_err := chan.create(chan.Chan(int), context.allocator) + + assert(recv1_err == nil, "allocation failed") + defer chan.destroy(recv1) + + recv2, recv2_err := chan.create(chan.Chan(int), 1, context.allocator) + + assert(recv2_err == nil, "allocation failed") + defer chan.destroy(recv2) + + send1, send1_err := chan.create(chan.Chan(int), 1, context.allocator) + + assert(send1_err == nil, "allocation failed") + defer chan.destroy(send1) + + msg := 42 + + // Preload recv2 to make it eligible for selection. + testing.expect_value(t, chan.send(recv2, msg), true) + + recvs := [?]^chan.Raw_Chan{recv1, recv2} + sends := [?]^chan.Raw_Chan{send1} + msgs := [?]rawptr{&msg} + received_value: int + + iteration_count := 0 + did_none_count := 0 + did_send_count := 0 + did_receive_count := 0 + + // This loop is expected to iterate three times. Twice to do the receive and + // send operations, and a third time to exit. + receive_loop: for { + + iteration_count += 1 + + idx, status := chan.try_select_raw(recvs[:], sends[:], msgs[:], &received_value) + + switch status { + case .None: + did_none_count += 1 + break receive_loop + + case .Recv: + did_receive_count += 1 + testing.expect_value(t, idx, 1) + testing.expect_value(t, received_value, msg) + received_value = 0 + + case .Send: + did_send_count += 1 + testing.expect_value(t, idx, 0) + v, ok := chan.try_recv(send1) + testing.expect_value(t, ok, true) + testing.expect_value(t, v, msg) + msgs[0] = nil // nil out the message to avoid constantly resending the same value. + } + } + + testing.expect_value(t, iteration_count, 3) + testing.expect_value(t, did_none_count, 1) + testing.expect_value(t, did_receive_count, 1) + testing.expect_value(t, did_send_count, 1) +} + +// Ensures that if no input channels are eligible to receive or send, the +// try_select_raw operation does not block. +@test +test_try_select_raw_default_state :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + recv1, recv1_err := chan.create(chan.Chan(int), context.allocator) + + assert(recv1_err == nil, "allocation failed") + defer chan.destroy(recv1) + + recv2, recv2_err := chan.create(chan.Chan(int), context.allocator) + + assert(recv2_err == nil, "allocation failed") + defer chan.destroy(recv2) + + recvs := [?]^chan.Raw_Chan{recv1, recv2} + received_value: int + + idx, status := chan.try_select_raw(recvs[:], nil, nil, &received_value) + + testing.expect_value(t, idx, -1) + testing.expect_value(t, status, chan.Select_Status.None) +} + +// Ensures that the operation will not block even if the input channels are +// consumed by a competing thread; that is, a value is received from another +// thread between calls to can_{send,recv} and try_{send,recv}_raw. +@test +test_try_select_raw_no_toctou :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + // Trigger will be used to coordinate between the thief and the try_select. + trigger, trigger_err := chan.create(chan.Chan(any), context.allocator) + + assert(trigger_err == nil, "allocation failed") + defer chan.destroy(trigger) + + __global_context_for_test = &trigger + defer __global_context_for_test = nil + + // Setup the pause proc. This will be invoked after the input channels are + // checked for eligibility but before any channel operations are attempted. + chan.__try_select_raw_pause = proc() { + trigger := (cast(^chan.Chan(any))(__global_context_for_test))^ + + // Notify the thief that we are paused so that it can steal the value. + _ = chan.send(trigger, "signal") + + // Wait for comfirmation of the burglary. + _, _ = chan.recv(trigger) + } + + defer chan.__try_select_raw_pause = nil + + recv1, recv1_err := chan.create(chan.Chan(int), 1, context.allocator) + + assert(recv1_err == nil, "allocation failed") + defer chan.destroy(recv1) + + Context :: struct { + recv1: chan.Chan(int), + trigger: chan.Chan(any), + } + + ctx := Context{ + recv1 = recv1, + trigger = trigger, + } + + // Spin up a thread that will steal the value from the input channel after + // try_select has already considered it eligible for selection. + thief := thread.create_and_start_with_poly_data(ctx, proc(ctx: Context) { + // Wait for eligibility check. + _, _ = chan.recv(ctx.trigger) + + // Steal the value. + v, ok := chan.recv(ctx.recv1) + + assert(ok, "recv1: expected to receive a value") + assert(v == 42, "recv1: unexpected receive value") + + // Notify select that we have stolen the value and that it can proceed. + _ = chan.send(ctx.trigger, "signal") + }) + + recvs := [?]^chan.Raw_Chan{recv1} + received_value: int + + // Ensure channel is eligible prior to entering the select. + testing.expect_value(t, chan.send(recv1, 42), true) + + // Execute the try_select_raw, assert that we don't block, and that we receive + // .None status since the value was stolen by the other thread. + idx, status := chan.try_select_raw(recvs[:], nil, nil, &received_value) + + testing.expect_value(t, idx, -1) + testing.expect_value(t, status, chan.Select_Status.None) + + thread.join(thief) + thread.destroy(thief) +} From 96b91849a9f91d1b00b91bb3a627c955e6cf42ef Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Wed, 11 Jun 2025 12:31:15 -0300 Subject: [PATCH 08/10] core/sync/chan.try_select_raw: fix doc comment typo Signed-off-by: Jack Mordaunt --- core/sync/chan/chan.odin | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 17c251158..694d89aeb 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1126,7 +1126,7 @@ Select_Status :: enum { /* Attempts to either send or receive messages on the specified channels without blocking. -`select_raw` first identifies which channels have messages ready to be received +`try_select_raw` first identifies which channels have messages ready to be received and which are available for sending. It then randomly selects one operation (either a send or receive) to perform. From c1cd525d9df428897393c748acdafe077b7de19d Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Wed, 11 Jun 2025 12:32:13 -0300 Subject: [PATCH 09/10] core/sync/chan.select_raw: call try_select_raw with deprecation warning Eventually select_raw should be a blocking select operation, but for now we need to migrate people away. --- core/sync/chan/chan.odin | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 694d89aeb..c5a4cf317 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -1254,6 +1254,11 @@ try_select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs } } +@(require_results, deprecated = "use try_select_raw") +select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, status: Select_Status) #no_bounds_check { + return try_select_raw(recvs, sends, send_msgs, recv_out) +} + /* `Raw_Queue` is a non-thread-safe queue implementation designed to store messages of fixed size and alignment. From 3c3fd6e580b017b2243303221709856b9c663a5c Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 12 Jun 2025 16:13:22 -0300 Subject: [PATCH 10/10] tests/core/sync/chan: move global state into test While this state is not actually needed by more than one test, we can just make it a static variable. --- tests/core/sync/chan/test_core_sync_chan.odin | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 6bb918516..e8bb553b1 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -35,8 +35,6 @@ MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second SLEEP_TIME :: 1 * time.Millisecond -__global_context_for_test: rawptr - comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data manual_buffering := data.manual_buffering @@ -384,6 +382,9 @@ test_try_select_raw_no_toctou :: proc(t: ^testing.T) { assert(trigger_err == nil, "allocation failed") defer chan.destroy(trigger) + @(static) + __global_context_for_test: rawptr + __global_context_for_test = &trigger defer __global_context_for_test = nil