core/sync/chan.try_send: avoid blocking if no reader is available

This changes the semantics of try_send to be consistently non-blocking.

That is, if the buffered is full OR there are no readers it returns
false.

The previous behaviour was such that it would block in the latter case
of no reader, and it would wait for a reader. That is problematic
because it produces inconsistent behaviour between buffered and
unbuffered channels which is astonishing and adds complexity to the
caller.

To illustrate the problem with the old behaviour, consider the
try_select operation: if a send-channel happens to be unbuffered the
try_select (which wants to never block) can now block, that unbuffered
send channel is selected (at random) and there is no reader on the other
side. Thus we have unpredictable blocking behaviour, which breaks the
guarantee that try_select never blocks.

If you want a blocking send you can just call "send" (the blocking
variant).

In addition, there is some reader/writer math done inside
can_{send,recv} such that they only report true if there is sufficient
reader/writer capacity. If there is contention we need to ensure that
each reader is paired to exactly one writer.

Consider try_send: if there is a single reader we can send. If there is
a single reader and a single writer, then we cannot send, as that reader
will be paired with the existing writer. Therefore can_send is only true
if there are more readers than writers at the time of check.

NOTE: The original tests don't need to use wait-looping with thread.yield()
or heuristic sleep. Instead we can just use blocking channel operations
rather than non-blocking operations.
This commit is contained in:
Jack Mordaunt
2025-06-12 12:39:57 -03:00
parent 52d38f1788
commit c29168f76f
2 changed files with 26 additions and 52 deletions

View File

@@ -779,7 +779,7 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool)
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.mutex)
if c.closed {
if c.closed || c.r_waiting - c.w_waiting <= 0 {
return false
}
@@ -843,7 +843,7 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool {
} else if c.unbuffered_data != nil { // unbuffered
sync.guard(&c.mutex)
if c.closed || c.w_waiting == 0 {
if c.closed || c.w_waiting - c.r_waiting <= 0 {
return false
}
@@ -1046,8 +1046,9 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool {
}
/*
Returns whether a message is ready to be read, i.e.,
if a call to `recv` or `recv_raw` would block
Returns whether a message can be read without blocking the current
thread. Specifically, it checks if the channel is buffered and not full,
or if there is already a writer attempting to send a message.
**Inputs**
- `c`: The channel
@@ -1075,7 +1076,7 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool {
if is_buffered(c) {
return c.queue.len > 0
}
return c.w_waiting > 0
return c.w_waiting - c.r_waiting > 0
}
@@ -1088,7 +1089,7 @@ or if there is already a reader waiting for a message.
- `c`: The channel
**Returns**
- `true` if a message can be send, `false` otherwise
- `true` if a message can be sent, `false` otherwise
Example:
@@ -1110,7 +1111,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool {
if is_buffered(c) {
return c.queue.len < c.queue.cap
}
return c.w_waiting == 0
return c.r_waiting - c.w_waiting > 0
}
/*

View File

@@ -33,7 +33,6 @@ Comm :: struct {
BUFFER_SIZE :: 8
MAX_RAND :: 32
FAIL_TIME :: 1 * time.Second
SLEEP_TIME :: 1 * time.Millisecond
// Synchronizes try_select tests that require access to global state.
test_lock: sync.Mutex
@@ -41,14 +40,9 @@ __global_context_for_test: rawptr
comm_client :: proc(th: ^thread.Thread) {
data := cast(^Comm)th.data
manual_buffering := data.manual_buffering
n: i64
for manual_buffering && !chan.can_recv(data.host) {
thread.yield()
}
recv_loop: for msg in chan.recv(data.host) {
#partial switch msg.type {
case .Add: n += msg.i
@@ -60,14 +54,6 @@ comm_client :: proc(th: ^thread.Thread) {
case:
panic("Unknown message type for client.")
}
for manual_buffering && !chan.can_recv(data.host) {
thread.yield()
}
}
for manual_buffering && !chan.can_send(data.host) {
thread.yield()
}
chan.send(data.client, Message{.Result, n})
@@ -76,9 +62,6 @@ comm_client :: proc(th: ^thread.Thread) {
send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) {
expected = 1
for manual_buffering && !chan.can_send(host) {
thread.yield()
}
chan.send(host, Message{.Add, 1})
log.debug(Message{.Add, 1})
@@ -100,9 +83,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering:
expected /= msg.i
}
for manual_buffering && !chan.can_send(host) {
thread.yield()
}
if manual_buffering {
testing.expect(t, chan.len(host) == 0)
}
@@ -111,9 +91,6 @@ send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering:
log.debug(msg)
}
for manual_buffering && !chan.can_send(host) {
thread.yield()
}
chan.send(host, Message{.End, 0})
log.debug(Message{.End, 0})
chan.close(host)
@@ -152,18 +129,15 @@ test_chan_buffered :: proc(t: ^testing.T) {
expected := send_messages(t, comm.host, manual_buffering = false)
// Sleep so we can give the other thread enough time to buffer its message.
time.sleep(SLEEP_TIME)
testing.expect_value(t, chan.len(comm.client), 1)
result, ok := chan.try_recv(comm.client)
// One more sleep to ensure it has enough time to close.
time.sleep(SLEEP_TIME)
testing.expect_value(t, chan.is_closed(comm.client), true)
result, ok := chan.recv(comm.client)
testing.expect_value(t, ok, true)
testing.expect_value(t, result.i, expected)
// Wait for channel to close.
_, ok = chan.recv(comm.client)
testing.expect(t, !ok, "channel should have been closed")
testing.expect_value(t, chan.is_closed(comm.client), true)
log.debug(result, expected)
// Make sure sending to closed channels fails.
@@ -175,6 +149,8 @@ test_chan_buffered :: proc(t: ^testing.T) {
_, ok = chan.recv(comm.client); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false)
thread.join(reckoner)
}
@test
@@ -197,6 +173,10 @@ test_chan_unbuffered :: proc(t: ^testing.T) {
testing.expect(t, !chan.is_buffered(comm.client))
testing.expect(t, chan.is_unbuffered(comm.host))
testing.expect(t, chan.is_unbuffered(comm.client))
testing.expect(t, !chan.can_send(comm.host))
testing.expect(t, !chan.can_send(comm.client))
testing.expect(t, !chan.can_recv(comm.host))
testing.expect(t, !chan.can_recv(comm.client))
testing.expect_value(t, chan.len(comm.host), 0)
testing.expect_value(t, chan.len(comm.client), 0)
testing.expect_value(t, chan.cap(comm.host), 0)
@@ -207,25 +187,16 @@ test_chan_unbuffered :: proc(t: ^testing.T) {
reckoner.data = &comm
thread.start(reckoner)
for !chan.can_send(comm.client) {
thread.yield()
}
expected := send_messages(t, comm.host)
testing.expect_value(t, chan.is_closed(comm.host), true)
for !chan.can_recv(comm.client) {
thread.yield()
}
result, ok := chan.try_recv(comm.client)
result, ok := chan.recv(comm.client)
testing.expect_value(t, ok, true)
testing.expect_value(t, result.i, expected)
log.debug(result, expected)
// Sleep so we can give the other thread enough time to close its side
// after we've received its message.
time.sleep(SLEEP_TIME)
_, ok2 := chan.recv(comm.client)
testing.expect(t, !ok2, "read of closed channel should return false")
testing.expect_value(t, chan.is_closed(comm.client), true)
@@ -238,6 +209,8 @@ test_chan_unbuffered :: proc(t: ^testing.T) {
_, ok = chan.recv(comm.client); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false)
_, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false)
thread.join(reckoner)
}
@test