From c29168f76f05e98e7532c65eda253e14992f8ddf Mon Sep 17 00:00:00 2001 From: Jack Mordaunt Date: Thu, 12 Jun 2025 12:39:57 -0300 Subject: [PATCH] core/sync/chan.try_send: avoid blocking if no reader is available This changes the semantics of try_send to be consistently non-blocking. That is, if the buffered is full OR there are no readers it returns false. The previous behaviour was such that it would block in the latter case of no reader, and it would wait for a reader. That is problematic because it produces inconsistent behaviour between buffered and unbuffered channels which is astonishing and adds complexity to the caller. To illustrate the problem with the old behaviour, consider the try_select operation: if a send-channel happens to be unbuffered the try_select (which wants to never block) can now block, that unbuffered send channel is selected (at random) and there is no reader on the other side. Thus we have unpredictable blocking behaviour, which breaks the guarantee that try_select never blocks. If you want a blocking send you can just call "send" (the blocking variant). In addition, there is some reader/writer math done inside can_{send,recv} such that they only report true if there is sufficient reader/writer capacity. If there is contention we need to ensure that each reader is paired to exactly one writer. Consider try_send: if there is a single reader we can send. If there is a single reader and a single writer, then we cannot send, as that reader will be paired with the existing writer. Therefore can_send is only true if there are more readers than writers at the time of check. NOTE: The original tests don't need to use wait-looping with thread.yield() or heuristic sleep. Instead we can just use blocking channel operations rather than non-blocking operations. --- core/sync/chan/chan.odin | 15 ++--- tests/core/sync/chan/test_core_sync_chan.odin | 63 ++++++------------- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index c5a4cf317..1d91556b5 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -779,7 +779,7 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - if c.closed { + if c.closed || c.r_waiting - c.w_waiting <= 0 { return false } @@ -843,7 +843,7 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool { } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - if c.closed || c.w_waiting == 0 { + if c.closed || c.w_waiting - c.r_waiting <= 0 { return false } @@ -1046,8 +1046,9 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { } /* -Returns whether a message is ready to be read, i.e., -if a call to `recv` or `recv_raw` would block +Returns whether a message can be read without blocking the current +thread. Specifically, it checks if the channel is buffered and not full, +or if there is already a writer attempting to send a message. **Inputs** - `c`: The channel @@ -1075,7 +1076,7 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len > 0 } - return c.w_waiting > 0 + return c.w_waiting - c.r_waiting > 0 } @@ -1088,7 +1089,7 @@ or if there is already a reader waiting for a message. - `c`: The channel **Returns** -- `true` if a message can be send, `false` otherwise +- `true` if a message can be sent, `false` otherwise Example: @@ -1110,7 +1111,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len < c.queue.cap } - return c.w_waiting == 0 + return c.r_waiting - c.w_waiting > 0 } /* diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 608d0c3d2..52b1f7d31 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -33,7 +33,6 @@ Comm :: struct { BUFFER_SIZE :: 8 MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second -SLEEP_TIME :: 1 * time.Millisecond // Synchronizes try_select tests that require access to global state. test_lock: sync.Mutex @@ -41,14 +40,9 @@ __global_context_for_test: rawptr comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data - manual_buffering := data.manual_buffering n: i64 - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - recv_loop: for msg in chan.recv(data.host) { #partial switch msg.type { case .Add: n += msg.i @@ -60,14 +54,6 @@ comm_client :: proc(th: ^thread.Thread) { case: panic("Unknown message type for client.") } - - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - } - - for manual_buffering && !chan.can_send(data.host) { - thread.yield() } chan.send(data.client, Message{.Result, n}) @@ -76,9 +62,6 @@ comm_client :: proc(th: ^thread.Thread) { send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) { expected = 1 - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.Add, 1}) log.debug(Message{.Add, 1}) @@ -100,9 +83,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: expected /= msg.i } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } if manual_buffering { testing.expect(t, chan.len(host) == 0) } @@ -111,9 +91,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: log.debug(msg) } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.End, 0}) log.debug(Message{.End, 0}) chan.close(host) @@ -152,18 +129,15 @@ test_chan_buffered :: proc(t: ^testing.T) { expected := send_messages(t, comm.host, manual_buffering = false) - // Sleep so we can give the other thread enough time to buffer its message. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.len(comm.client), 1) - result, ok := chan.try_recv(comm.client) - - // One more sleep to ensure it has enough time to close. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.is_closed(comm.client), true) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) + + // Wait for channel to close. + _, ok = chan.recv(comm.client) + testing.expect(t, !ok, "channel should have been closed") + + testing.expect_value(t, chan.is_closed(comm.client), true) log.debug(result, expected) // Make sure sending to closed channels fails. @@ -175,6 +149,8 @@ test_chan_buffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test @@ -197,6 +173,10 @@ test_chan_unbuffered :: proc(t: ^testing.T) { testing.expect(t, !chan.is_buffered(comm.client)) testing.expect(t, chan.is_unbuffered(comm.host)) testing.expect(t, chan.is_unbuffered(comm.client)) + testing.expect(t, !chan.can_send(comm.host)) + testing.expect(t, !chan.can_send(comm.client)) + testing.expect(t, !chan.can_recv(comm.host)) + testing.expect(t, !chan.can_recv(comm.client)) testing.expect_value(t, chan.len(comm.host), 0) testing.expect_value(t, chan.len(comm.client), 0) testing.expect_value(t, chan.cap(comm.host), 0) @@ -207,25 +187,16 @@ test_chan_unbuffered :: proc(t: ^testing.T) { reckoner.data = &comm thread.start(reckoner) - for !chan.can_send(comm.client) { - thread.yield() - } - expected := send_messages(t, comm.host) testing.expect_value(t, chan.is_closed(comm.host), true) - for !chan.can_recv(comm.client) { - thread.yield() - } - - result, ok := chan.try_recv(comm.client) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) log.debug(result, expected) - // Sleep so we can give the other thread enough time to close its side - // after we've received its message. - time.sleep(SLEEP_TIME) + _, ok2 := chan.recv(comm.client) + testing.expect(t, !ok2, "read of closed channel should return false") testing.expect_value(t, chan.is_closed(comm.client), true) @@ -238,6 +209,8 @@ test_chan_unbuffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test