renames threadpool.await to blockUntil; refs #7853

This commit is contained in:
Araq
2018-08-14 01:28:04 +02:00
parent 741f95e2d6
commit 83c89197f3
5 changed files with 36 additions and 32 deletions

View File

@@ -63,6 +63,10 @@
- ``lineInfoObj`` now returns absolute path instead of project path.
It's used by ``lineInfo``, ``check``, ``expect``, ``require``, etc.
- `threadpool`'s `await` and derivatives have been renamed to `blockUntil`
to avoid confusions with `await` from the `async` macro.
#### Breaking changes in the compiler
- The undocumented ``#? braces`` parsing mode was removed.

View File

@@ -7877,7 +7877,7 @@ that ``spawn`` takes is restricted:
``spawn`` executes the passed expression on the thread pool and returns
a `data flow variable`:idx: ``FlowVar[T]`` that can be read from. The reading
with the ``^`` operator is **blocking**. However, one can use ``awaitAny`` to
with the ``^`` operator is **blocking**. However, one can use ``blockUntilAny`` to
wait on multiple flow variables at the same time:
.. code-block:: nim
@@ -7888,10 +7888,10 @@ wait on multiple flow variables at the same time:
var responses = newSeq[FlowVarBase](3)
for i in 0..2:
responses[i] = spawn tellServer(Update, "key", "value")
var index = awaitAny(responses)
var index = blockUntilAny(responses)
assert index >= 0
responses.del(index)
discard awaitAny(responses)
discard blockUntilAny(responses)
Data flow variables ensure that no data races
are possible. Due to technical limitations not every type ``T`` is possible in

View File

@@ -25,7 +25,7 @@ Spawn statement
A standalone ``spawn`` statement is a simple construct. It executes
the passed expression on the thread pool and returns a `data flow variable`:idx:
``FlowVar[T]`` that can be read from. The reading with the ``^`` operator is
**blocking**. However, one can use ``awaitAny`` to wait on multiple flow
**blocking**. However, one can use ``blockUntilAny`` to wait on multiple flow
variables at the same time:
.. code-block:: nim
@@ -36,10 +36,10 @@ variables at the same time:
var responses = newSeq[FlowVarBase](3)
for i in 0..2:
responses[i] = spawn tellServer(Update, "key", "value")
var index = awaitAny(responses)
var index = blockUntilAny(responses)
assert index >= 0
responses.del(index)
discard awaitAny(responses)
discard blockUntilAny(responses)
Data flow variables ensure that no data races
are possible. Due to technical limitations not every type ``T`` is possible in

View File

@@ -30,7 +30,7 @@ proc destroySemaphore(cv: var Semaphore) {.inline.} =
deinitCond(cv.c)
deinitLock(cv.L)
proc await(cv: var Semaphore) =
proc blockUntil(cv: var Semaphore) =
acquire(cv.L)
while cv.counter <= 0:
wait(cv.c, cv.L)
@@ -81,7 +81,7 @@ proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
fence()
b.interest = true
fence()
while b.left != b.entered: await(b.cv)
while b.left != b.entered: blockUntil(b.cv)
destroySemaphore(b.cv)
{.pop.}
@@ -97,7 +97,7 @@ type
FlowVarBaseObj = object of RootObj
ready, usesSemaphore, awaited: bool
cv: Semaphore #\
# for 'awaitAny' support
# for 'blockUntilAny' support
ai: ptr AwaitInfo
idx: int
data: pointer # we incRef and unref it to keep it alive; note this MUST NOT
@@ -128,12 +128,12 @@ type
q: ToFreeQueue
readyForTask: Semaphore
proc await*(fv: FlowVarBase) =
proc blockUntil*(fv: FlowVarBase) =
## waits until the value for the flowVar arrives. Usually it is not necessary
## to call this explicitly.
if fv.usesSemaphore and not fv.awaited:
fv.awaited = true
await(fv.cv)
blockUntil(fv.cv)
destroySemaphore(fv.cv)
proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
@@ -141,7 +141,7 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
w.data = data
w.f = fn
signal(w.taskArrived)
await(w.taskStarted)
blockUntil(w.taskStarted)
result = true
proc cleanFlowVars(w: ptr Worker) =
@@ -176,11 +176,11 @@ proc attach(fv: FlowVarBase; i: int): bool =
release(fv.cv.L)
proc finished(fv: FlowVarBase) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
# we have to protect against the rare cases where the owner of the flowVar
# simply disregards the flowVar and yet the "flowVar" has not yet written
# anything to it:
await(fv)
blockUntil(fv)
if fv.data.isNil: return
let owner = cast[ptr Worker](fv.owner)
let q = addr(owner.q)
@@ -189,7 +189,7 @@ proc finished(fv: FlowVarBase) =
#echo "EXHAUSTED!"
release(q.lock)
wakeupWorkerToProcessQueue(owner)
await(q.empty)
blockUntil(q.empty)
acquire(q.lock)
q.data[q.len] = cast[pointer](fv.data)
inc q.len
@@ -220,7 +220,7 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
## to ``action``. Note that due to Nim's parameter passing semantics this
## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
## sometimes be more efficient than ``^``.
await(fv)
blockUntil(fv)
when T is string or T is seq:
action(cast[T](fv.data))
elif T is ref:
@@ -231,29 +231,29 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
## blocks until the value is available and then returns this value.
await(fv)
blockUntil(fv)
result = cast[ptr T](fv.data)
proc `^`*[T](fv: FlowVar[ref T]): ref T =
## blocks until the value is available and then returns this value.
await(fv)
blockUntil(fv)
let src = cast[ref T](fv.data)
deepCopy result, src
proc `^`*[T](fv: FlowVar[T]): T =
## blocks until the value is available and then returns this value.
await(fv)
blockUntil(fv)
when T is string or T is seq:
# XXX closures? deepCopy?
result = cast[T](fv.data)
else:
result = fv.blob
proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## awaits any of the given flowVars. Returns the index of one flowVar for
## which a value arrived. A flowVar only supports one call to 'awaitAny' at
## the same time. That means if you awaitAny([a,b]) and awaitAny([b,c]) the second
## call will only await 'c'. If there is no flowVar left to be able to wait
## which a value arrived. A flowVar only supports one call to 'blockUntilAny' at
## the same time. That means if you blockUntilAny([a,b]) and blockUntilAny([b,c]) the second
## call will only blockUntil 'c'. If there is no flowVar left to be able to wait
## on, -1 is returned.
## **Note**: This results in non-deterministic behaviour and should be avoided.
var ai: AwaitInfo
@@ -269,7 +269,7 @@ proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
inc conflicts
if conflicts < flowVars.len:
if result < 0:
await(ai.cv)
blockUntil(ai.cv)
result = ai.idx
for i in 0 .. flowVars.high:
discard cas(addr flowVars[i].ai, addr ai, nil)
@@ -326,7 +326,7 @@ proc slave(w: ptr Worker) {.thread.} =
w.ready = true
readyWorker = w
signal(gSomeReady)
await(w.taskArrived)
blockUntil(w.taskArrived)
# XXX Somebody needs to look into this (why does this assertion fail
# in Visual Studio?)
when not defined(vcc) and not defined(tcc): assert(not w.ready)
@@ -351,7 +351,7 @@ proc distinguishedSlave(w: ptr Worker) {.thread.} =
else:
w.ready = true
signal(w.readyForTask)
await(w.taskArrived)
blockUntil(w.taskArrived)
assert(not w.ready)
w.f(w, w.data)
if w.q.len != 0: w.cleanFlowVars
@@ -499,7 +499,7 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
# on the current thread instead.
var self = addr(workersData[localThreadId-1])
fn(self, data)
await(self.taskStarted)
blockUntil(self.taskStarted)
return
if isSlave:
@@ -524,7 +524,7 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
inc numSlavesWaiting
await(gSomeReady)
blockUntil(gSomeReady)
if isSlave:
withLock numSlavesLock:
@@ -542,7 +542,7 @@ proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
release(distinguishedLock)
while true:
if selectWorker(addr(distinguishedData[id]), fn, data): break
await(distinguishedData[id].readyForTask)
blockUntil(distinguishedData[id].readyForTask)
proc sync*() =
@@ -555,7 +555,7 @@ proc sync*() =
if not allReady: break
allReady = allReady and workersData[i].ready
if allReady: break
await(gSomeReady)
blockUntil(gSomeReady)
inc toRelease
for i in 0 ..< toRelease:

View File

@@ -18,12 +18,12 @@ var results: seq[int] = @[]
for i in 0 .. durations.high:
tasks.add spawn timer(durations[i])
var index = awaitAny(tasks)
var index = blockUntilAny(tasks)
while index != -1:
results.add ^cast[FlowVar[int]](tasks[index])
tasks.del(index)
#echo repr results
index = awaitAny(tasks)
index = blockUntilAny(tasks)
doAssert results.len == 5
doAssert 1000 in results