From 3d543b1539f8d5dde5746c90c5de5fa3df1cadfd Mon Sep 17 00:00:00 2001 From: Yuriy Glukhov Date: Mon, 31 Jul 2017 21:06:55 +0300 Subject: [PATCH] Channels can now block depending on maxItems (#6153) --- lib/system/channels.nim | 43 ++++++++++++++++++++-------- tests/parallel/tblocking_channel.nim | 37 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 12 deletions(-) create mode 100644 tests/parallel/tblocking_channel.nim diff --git a/lib/system/channels.nim b/lib/system/channels.nim index e3baff797e..1b90e245f1 100644 --- a/lib/system/channels.nim +++ b/lib/system/channels.nim @@ -22,7 +22,7 @@ when not declared(NimString): type pbytes = ptr array[0.. 0xffff, byte] RawChannel {.pure, final.} = object ## msg queue for a thread - rd, wr, count, mask: int + rd, wr, count, mask, maxItems: int data: pbytes lock: SysLock cond: SysCond @@ -37,11 +37,12 @@ type const ChannelDeadMask = -2 -proc initRawChannel(p: pointer) = +proc initRawChannel(p: pointer, maxItems: int) = var c = cast[PRawChannel](p) initSysLock(c.lock) initSysCond(c.cond) c.mask = -1 + c.maxItems = maxItems proc deinitRawChannel(p: pointer) = var c = cast[PRawChannel](p) @@ -208,23 +209,36 @@ template lockChannel(q, action): untyped = action releaseSys(q.lock) -template sendImpl(q) = +proc sendImpl(q: PRawChannel, typ: PNimType, msg: pointer, noBlock: bool): bool = if q.mask == ChannelDeadMask: sysFatal(DeadThreadError, "cannot send message; thread died") acquireSys(q.lock) - var typ = cast[PNimType](getTypeInfo(msg)) - rawSend(q, unsafeAddr(msg), typ) + if q.maxItems > 0: + # Wait until count is less than maxItems + if noBlock and q.count >= q.maxItems: + releaseSys(q.lock) + return + + while q.count >= q.maxItems: + waitSysCond(q.cond, q.lock) + + rawSend(q, msg, typ) q.elemType = typ releaseSys(q.lock) signalSysCond(q.cond) + result = true -proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) = +proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) {.inline.} = ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PRawChannel](addr(c)) - sendImpl(q) + discard sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), false) + +proc trySend*[TMsg](c: var Channel[TMsg], msg: TMsg): bool {.inline.} = + ## Tries to send a message to a thread. `msg` is deeply copied. Doesn't block. + ## Returns `false` if the message was not sent because number of pending items + ## in the cannel exceeded `maxItems`. + sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), true) proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = - # to save space, the generic is as small as possible q.ready = true while q.count <= 0: waitSysCond(q.cond, q.lock) @@ -233,6 +247,9 @@ proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = releaseSys(q.lock) sysFatal(ValueError, "cannot receive message of wrong type") rawRecv(q, res, typ) + if q.maxItems > 0 and q.count == q.maxItems - 1: + # Parent thread is awaiting in send. Wake it up. + signalSysCond(q.cond) proc recv*[TMsg](c: var Channel[TMsg]): TMsg = ## receives a message from the channel `c`. This blocks until @@ -267,9 +284,11 @@ proc peek*[TMsg](c: var Channel[TMsg]): int = else: result = -1 -proc open*[TMsg](c: var Channel[TMsg]) = - ## opens a channel `c` for inter thread communication. - initRawChannel(addr(c)) +proc open*[TMsg](c: var Channel[TMsg], maxItems: int = 0) = + ## opens a channel `c` for inter thread communication. The `send` operation + ## will block until number of unprocessed items is less than `maxItems`. + ## For unlimited queue set `maxItems` to 0. + initRawChannel(addr(c), maxItems) proc close*[TMsg](c: var Channel[TMsg]) = ## closes a channel `c` and frees its associated resources. diff --git a/tests/parallel/tblocking_channel.nim b/tests/parallel/tblocking_channel.nim new file mode 100644 index 0000000000..8b8b494542 --- /dev/null +++ b/tests/parallel/tblocking_channel.nim @@ -0,0 +1,37 @@ +discard """ +output: "" +""" +import threadpool, os + +var chan: Channel[int] + +chan.open(2) +chan.send(1) +chan.send(2) +doAssert(not chan.trySend(3)) # At this point chan is at max capacity + +proc receiver() = + doAssert(chan.recv() == 1) + doAssert(chan.recv() == 2) + doAssert(chan.recv() == 3) + doAssert(chan.recv() == 4) + doAssert(chan.recv() == 5) + +var msgSent = false + +proc emitter() = + chan.send(3) + msgSent = true + +spawn emitter() +# At this point emitter should be stuck in `send` +sleep(100) # Sleep a bit to ensure that it is still stuck +doAssert(not msgSent) + +spawn receiver() +sleep(100) # Sleep a bit to let receicer consume the messages +doAssert(msgSent) # Sender should be unblocked + +doAssert(chan.trySend(4)) +chan.send(5) +sync()