mirror of
https://github.com/nim-lang/Nim.git
synced 2026-06-05 03:14:08 +00:00
Channels can now block depending on maxItems (#6153)
This commit is contained in:
committed by
Andreas Rumpf
parent
6b38b37b4f
commit
3d543b1539
@@ -22,7 +22,7 @@ when not declared(NimString):
|
||||
type
|
||||
pbytes = ptr array[0.. 0xffff, byte]
|
||||
RawChannel {.pure, final.} = object ## msg queue for a thread
|
||||
rd, wr, count, mask: int
|
||||
rd, wr, count, mask, maxItems: int
|
||||
data: pbytes
|
||||
lock: SysLock
|
||||
cond: SysCond
|
||||
@@ -37,11 +37,12 @@ type
|
||||
|
||||
const ChannelDeadMask = -2
|
||||
|
||||
proc initRawChannel(p: pointer) =
|
||||
proc initRawChannel(p: pointer, maxItems: int) =
|
||||
var c = cast[PRawChannel](p)
|
||||
initSysLock(c.lock)
|
||||
initSysCond(c.cond)
|
||||
c.mask = -1
|
||||
c.maxItems = maxItems
|
||||
|
||||
proc deinitRawChannel(p: pointer) =
|
||||
var c = cast[PRawChannel](p)
|
||||
@@ -208,23 +209,36 @@ template lockChannel(q, action): untyped =
|
||||
action
|
||||
releaseSys(q.lock)
|
||||
|
||||
template sendImpl(q) =
|
||||
proc sendImpl(q: PRawChannel, typ: PNimType, msg: pointer, noBlock: bool): bool =
|
||||
if q.mask == ChannelDeadMask:
|
||||
sysFatal(DeadThreadError, "cannot send message; thread died")
|
||||
acquireSys(q.lock)
|
||||
var typ = cast[PNimType](getTypeInfo(msg))
|
||||
rawSend(q, unsafeAddr(msg), typ)
|
||||
if q.maxItems > 0:
|
||||
# Wait until count is less than maxItems
|
||||
if noBlock and q.count >= q.maxItems:
|
||||
releaseSys(q.lock)
|
||||
return
|
||||
|
||||
while q.count >= q.maxItems:
|
||||
waitSysCond(q.cond, q.lock)
|
||||
|
||||
rawSend(q, msg, typ)
|
||||
q.elemType = typ
|
||||
releaseSys(q.lock)
|
||||
signalSysCond(q.cond)
|
||||
result = true
|
||||
|
||||
proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) =
|
||||
proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) {.inline.} =
|
||||
## sends a message to a thread. `msg` is deeply copied.
|
||||
var q = cast[PRawChannel](addr(c))
|
||||
sendImpl(q)
|
||||
discard sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), false)
|
||||
|
||||
proc trySend*[TMsg](c: var Channel[TMsg], msg: TMsg): bool {.inline.} =
|
||||
## Tries to send a message to a thread. `msg` is deeply copied. Doesn't block.
|
||||
## Returns `false` if the message was not sent because number of pending items
|
||||
## in the cannel exceeded `maxItems`.
|
||||
sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), true)
|
||||
|
||||
proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =
|
||||
# to save space, the generic is as small as possible
|
||||
q.ready = true
|
||||
while q.count <= 0:
|
||||
waitSysCond(q.cond, q.lock)
|
||||
@@ -233,6 +247,9 @@ proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =
|
||||
releaseSys(q.lock)
|
||||
sysFatal(ValueError, "cannot receive message of wrong type")
|
||||
rawRecv(q, res, typ)
|
||||
if q.maxItems > 0 and q.count == q.maxItems - 1:
|
||||
# Parent thread is awaiting in send. Wake it up.
|
||||
signalSysCond(q.cond)
|
||||
|
||||
proc recv*[TMsg](c: var Channel[TMsg]): TMsg =
|
||||
## receives a message from the channel `c`. This blocks until
|
||||
@@ -267,9 +284,11 @@ proc peek*[TMsg](c: var Channel[TMsg]): int =
|
||||
else:
|
||||
result = -1
|
||||
|
||||
proc open*[TMsg](c: var Channel[TMsg]) =
|
||||
## opens a channel `c` for inter thread communication.
|
||||
initRawChannel(addr(c))
|
||||
proc open*[TMsg](c: var Channel[TMsg], maxItems: int = 0) =
|
||||
## opens a channel `c` for inter thread communication. The `send` operation
|
||||
## will block until number of unprocessed items is less than `maxItems`.
|
||||
## For unlimited queue set `maxItems` to 0.
|
||||
initRawChannel(addr(c), maxItems)
|
||||
|
||||
proc close*[TMsg](c: var Channel[TMsg]) =
|
||||
## closes a channel `c` and frees its associated resources.
|
||||
|
||||
37
tests/parallel/tblocking_channel.nim
Normal file
37
tests/parallel/tblocking_channel.nim
Normal file
@@ -0,0 +1,37 @@
|
||||
discard """
|
||||
output: ""
|
||||
"""
|
||||
import threadpool, os
|
||||
|
||||
var chan: Channel[int]
|
||||
|
||||
chan.open(2)
|
||||
chan.send(1)
|
||||
chan.send(2)
|
||||
doAssert(not chan.trySend(3)) # At this point chan is at max capacity
|
||||
|
||||
proc receiver() =
|
||||
doAssert(chan.recv() == 1)
|
||||
doAssert(chan.recv() == 2)
|
||||
doAssert(chan.recv() == 3)
|
||||
doAssert(chan.recv() == 4)
|
||||
doAssert(chan.recv() == 5)
|
||||
|
||||
var msgSent = false
|
||||
|
||||
proc emitter() =
|
||||
chan.send(3)
|
||||
msgSent = true
|
||||
|
||||
spawn emitter()
|
||||
# At this point emitter should be stuck in `send`
|
||||
sleep(100) # Sleep a bit to ensure that it is still stuck
|
||||
doAssert(not msgSent)
|
||||
|
||||
spawn receiver()
|
||||
sleep(100) # Sleep a bit to let receicer consume the messages
|
||||
doAssert(msgSent) # Sender should be unblocked
|
||||
|
||||
doAssert(chan.trySend(4))
|
||||
chan.send(5)
|
||||
sync()
|
||||
Reference in New Issue
Block a user