This commit is contained in:
Ștefan Talpalaru
2019-05-20 09:29:13 +02:00
committed by Andreas Rumpf
parent a63c2a25d0
commit 13b3e4af8a
2 changed files with 22 additions and 3 deletions

View File

@@ -133,6 +133,8 @@ type
q: ToFreeQueue
readyForTask: Semaphore
const threadpoolWaitMs {.intdefine.}: int = 100
proc blockUntil*(fv: FlowVarBase) =
## Waits until the value for the ``fv`` arrives.
##
@@ -201,6 +203,8 @@ proc finished(fv: FlowVarBase) =
inc q.len
release(q.lock)
fv.data = nil
# the worker thread waits for "data" to be set to nil before shutting down
owner.data = nil
proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
@@ -241,21 +245,24 @@ proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
result = cast[ptr T](fv.data)
finished(fv)
proc `^`*[T](fv: FlowVar[ref T]): ref T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
let src = cast[ref T](fv.data)
deepCopy result, src
finished(fv)
proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
when T is string or T is seq:
# XXX closures? deepCopy?
result = cast[T](fv.data)
let src = cast[T](fv.data)
deepCopy result, src
else:
result = fv.blob
finished(fv)
proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
@@ -334,6 +341,16 @@ proc slave(w: ptr Worker) {.thread.} =
if w.shutdown:
w.shutdown = false
atomicDec currentPoolSize
while true:
if w.data != nil:
sleep(threadpoolWaitMs)
else:
# The flowvar finalizer ("finished()") set w.data to nil, so we can
# safely terminate the thread.
#
# TODO: look for scenarios in which the flowvar is never finalized, so
# a shut down thread gets stuck in this loop until the main thread exits.
break
break
when declared(atomicStoreN):
atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
@@ -576,7 +593,7 @@ proc sync*() =
if not allReady: break
allReady = allReady and workersData[i].ready
if allReady: break
sleep(100)
sleep(threadpoolWaitMs)
# We cannot "blockUntil(gSomeReady)" because workers may be shut down between
# the time we establish that some are not "ready" and the time we wait for a
# "signal(gSomeReady)" from inside "slave()" that can never come.

View File

@@ -52,6 +52,8 @@ proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : s
result = concat(^ul[0], ^ul[1])
var s = map(toSeq(0..99999), proc(x: int): Point = (float(x div 1000), float(x mod 1000)))
# On some runs, this pool size reduction will set the "shutdown" attribute on the
# worker thread that executes our spawned task, before we can read the flowvars.
setMaxPoolSize 2
#echo convex_hull[Point](s, cmpPoint)