fixes #7638; awaitAny blocks if the flow vars all have been complete already

This commit is contained in:
Andreas Rumpf
2018-04-19 08:54:16 +02:00
parent cb03ae2c9f
commit 0dc4d6dcc2
2 changed files with 52 additions and 7 deletions

View File

@@ -168,6 +168,15 @@ proc wakeupWorkerToProcessQueue(w: ptr Worker) =
signal(w.q.empty)
signal(w.taskArrived)
proc attach(fv: FlowVarBase; i: int): bool =
acquire(fv.cv.L)
if fv.cv.counter <= 0:
fv.idx = i
result = true
else:
result = false
release(fv.cv.L)
proc finished(fv: FlowVarBase) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
# we have to protect against the rare cases where the owner of the flowVar
@@ -248,23 +257,24 @@ proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
## the same time. That means if you awaitAny([a,b]) and awaitAny([b,c]) the second
## call will only await 'c'. If there is no flowVar left to be able to wait
## on, -1 is returned.
## **Note**: This results in non-deterministic behaviour and so should be
## avoided.
## **Note**: This results in non-deterministic behaviour and should be avoided.
var ai: AwaitInfo
ai.cv.initSemaphore()
var conflicts = 0
result = -1
for i in 0 .. flowVars.high:
if cas(addr flowVars[i].ai, nil, addr ai):
flowVars[i].idx = i
if not attach(flowVars[i], i):
result = i
break
else:
inc conflicts
if conflicts < flowVars.len:
await(ai.cv)
result = ai.idx
if result < 0:
await(ai.cv)
result = ai.idx
for i in 0 .. flowVars.high:
discard cas(addr flowVars[i].ai, addr ai, nil)
else:
result = -1
destroySemaphore(ai.cv)
proc isReady*(fv: FlowVarBase): bool =

View File

@@ -0,0 +1,35 @@
discard """
output: '''true'''
"""
# bug #7638
import threadpool, os, strformat
proc timer(d: int): int =
#echo fmt"sleeping {d}"
sleep(d)
#echo fmt"done {d}"
return d
var durations = [1000, 2000, 3000, 4000, 5000]
var tasks: seq[FlowVarBase] = @[]
var results: seq[int] = @[]
for i in 0 .. durations.high:
tasks.add spawn timer(durations[i])
var index = awaitAny(tasks)
while index != -1:
results.add ^cast[FlowVar[int]](tasks[index])
tasks.del(index)
#echo repr results
index = awaitAny(tasks)
doAssert results.len == 5
doAssert 1000 in results
doAssert 2000 in results
doAssert 3000 in results
doAssert 4000 in results
doAssert 5000 in results
sync()
echo "true"