diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index c5a4cf317..1d91556b5 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -779,7 +779,7 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - if c.closed { + if c.closed || c.r_waiting - c.w_waiting <= 0 { return false } @@ -843,7 +843,7 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool { } else if c.unbuffered_data != nil { // unbuffered sync.guard(&c.mutex) - if c.closed || c.w_waiting == 0 { + if c.closed || c.w_waiting - c.r_waiting <= 0 { return false } @@ -1046,8 +1046,9 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { } /* -Returns whether a message is ready to be read, i.e., -if a call to `recv` or `recv_raw` would block +Returns whether a message can be read without blocking the current +thread. Specifically, it checks if the channel is buffered and not full, +or if there is already a writer attempting to send a message. **Inputs** - `c`: The channel @@ -1075,7 +1076,7 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len > 0 } - return c.w_waiting > 0 + return c.w_waiting - c.r_waiting > 0 } @@ -1088,7 +1089,7 @@ or if there is already a reader waiting for a message. - `c`: The channel **Returns** -- `true` if a message can be send, `false` otherwise +- `true` if a message can be sent, `false` otherwise Example: @@ -1110,7 +1111,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len < c.queue.cap } - return c.w_waiting == 0 + return c.r_waiting - c.w_waiting > 0 } /* diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin index 608d0c3d2..52b1f7d31 100644 --- a/tests/core/sync/chan/test_core_sync_chan.odin +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -33,7 +33,6 @@ Comm :: struct { BUFFER_SIZE :: 8 MAX_RAND :: 32 FAIL_TIME :: 1 * time.Second -SLEEP_TIME :: 1 * time.Millisecond // Synchronizes try_select tests that require access to global state. test_lock: sync.Mutex @@ -41,14 +40,9 @@ __global_context_for_test: rawptr comm_client :: proc(th: ^thread.Thread) { data := cast(^Comm)th.data - manual_buffering := data.manual_buffering n: i64 - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - recv_loop: for msg in chan.recv(data.host) { #partial switch msg.type { case .Add: n += msg.i @@ -60,14 +54,6 @@ comm_client :: proc(th: ^thread.Thread) { case: panic("Unknown message type for client.") } - - for manual_buffering && !chan.can_recv(data.host) { - thread.yield() - } - } - - for manual_buffering && !chan.can_send(data.host) { - thread.yield() } chan.send(data.client, Message{.Result, n}) @@ -76,9 +62,6 @@ comm_client :: proc(th: ^thread.Thread) { send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) { expected = 1 - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.Add, 1}) log.debug(Message{.Add, 1}) @@ -100,9 +83,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: expected /= msg.i } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } if manual_buffering { testing.expect(t, chan.len(host) == 0) } @@ -111,9 +91,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: log.debug(msg) } - for manual_buffering && !chan.can_send(host) { - thread.yield() - } chan.send(host, Message{.End, 0}) log.debug(Message{.End, 0}) chan.close(host) @@ -152,18 +129,15 @@ test_chan_buffered :: proc(t: ^testing.T) { expected := send_messages(t, comm.host, manual_buffering = false) - // Sleep so we can give the other thread enough time to buffer its message. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.len(comm.client), 1) - result, ok := chan.try_recv(comm.client) - - // One more sleep to ensure it has enough time to close. - time.sleep(SLEEP_TIME) - - testing.expect_value(t, chan.is_closed(comm.client), true) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) + + // Wait for channel to close. + _, ok = chan.recv(comm.client) + testing.expect(t, !ok, "channel should have been closed") + + testing.expect_value(t, chan.is_closed(comm.client), true) log.debug(result, expected) // Make sure sending to closed channels fails. @@ -175,6 +149,8 @@ test_chan_buffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test @@ -197,6 +173,10 @@ test_chan_unbuffered :: proc(t: ^testing.T) { testing.expect(t, !chan.is_buffered(comm.client)) testing.expect(t, chan.is_unbuffered(comm.host)) testing.expect(t, chan.is_unbuffered(comm.client)) + testing.expect(t, !chan.can_send(comm.host)) + testing.expect(t, !chan.can_send(comm.client)) + testing.expect(t, !chan.can_recv(comm.host)) + testing.expect(t, !chan.can_recv(comm.client)) testing.expect_value(t, chan.len(comm.host), 0) testing.expect_value(t, chan.len(comm.client), 0) testing.expect_value(t, chan.cap(comm.host), 0) @@ -207,25 +187,16 @@ test_chan_unbuffered :: proc(t: ^testing.T) { reckoner.data = &comm thread.start(reckoner) - for !chan.can_send(comm.client) { - thread.yield() - } - expected := send_messages(t, comm.host) testing.expect_value(t, chan.is_closed(comm.host), true) - for !chan.can_recv(comm.client) { - thread.yield() - } - - result, ok := chan.try_recv(comm.client) + result, ok := chan.recv(comm.client) testing.expect_value(t, ok, true) testing.expect_value(t, result.i, expected) log.debug(result, expected) - // Sleep so we can give the other thread enough time to close its side - // after we've received its message. - time.sleep(SLEEP_TIME) + _, ok2 := chan.recv(comm.client) + testing.expect(t, !ok2, "read of closed channel should return false") testing.expect_value(t, chan.is_closed(comm.client), true) @@ -238,6 +209,8 @@ test_chan_unbuffered :: proc(t: ^testing.T) { _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) + + thread.join(reckoner) } @test