Merge branch 'new_spawn' of https://github.com/Araq/Nimrod into new_spawn

This commit is contained in:
Araq
2014-06-06 21:11:11 +02:00
9 changed files with 305 additions and 212 deletions

View File

@@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) =
of mSlurp..mQuoteAst:
localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s)
of mSpawn:
let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil)
let n = lowerings.wrapProcForSpawn(p.module.module, e, e.typ, nil, nil)
expr(p, n, d)
of mParallel:
let n = semparallel.liftParallel(p.module.module, e)

View File

@@ -86,7 +86,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
# returns a[].b as a node
var deref = newNodeI(nkHiddenDeref, info)
deref.typ = a.typ.skipTypes(abstractInst).sons[0]
var t = deref.typ
var t = deref.typ.skipTypes(abstractInst)
var field: PSym
while true:
assert t.kind == tyObject
@@ -94,6 +94,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
if field != nil: break
t = t.sons[0]
if t == nil: break
t = t.skipTypes(abstractInst)
assert field != nil, b
addSon(deref, a)
result = newNodeI(nkDotExpr, info)
@@ -132,28 +133,33 @@ proc callCodegenProc*(name: string, arg1: PNode;
if arg3 != nil: result.add arg3
result.typ = sym.typ.sons[0]
proc callProc(a: PNode): PNode =
result = newNodeI(nkCall, a.info)
result.add a
result.typ = a.typ.sons[0]
# we have 4 cases to consider:
# - a void proc --> nothing to do
# - a proc returning GC'ed memory --> requires a promise
# - a proc returning GC'ed memory --> requires a flowVar
# - a proc returning non GC'ed memory --> pass as hidden 'var' parameter
# - not in a parallel environment --> requires a promise for memory safety
# - not in a parallel environment --> requires a flowVar for memory safety
type
TSpawnResult = enum
srVoid, srPromise, srByVar
TPromiseKind = enum
promInvalid # invalid type T for 'Promise[T]'
promGC # Promise of a GC'ed type
promBlob # Promise of a blob type
srVoid, srFlowVar, srByVar
TFlowVarKind = enum
fvInvalid # invalid type T for 'FlowVar[T]'
fvGC # FlowVar of a GC'ed type
fvBlob # FlowVar of a blob type
proc spawnResult(t: PType; inParallel: bool): TSpawnResult =
if t.isEmptyType: srVoid
elif inParallel and not containsGarbageCollectedRef(t): srByVar
else: srPromise
else: srFlowVar
proc promiseKind(t: PType): TPromiseKind =
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: promGC
elif containsGarbageCollectedRef(t): promInvalid
else: promBlob
proc flowVarKind(t: PType): TFlowVarKind =
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
elif containsGarbageCollectedRef(t): fvInvalid
else: fvBlob
proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym =
result = newSym(skTemp, getIdent(genPrefix), owner, varSection.info)
@@ -169,18 +175,18 @@ proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym =
discard """
We generate roughly this:
proc f_wrapper(args) =
proc f_wrapper(thread, args) =
barrierEnter(args.barrier) # for parallel statement
var a = args.a # thread transfer; deepCopy or shallowCopy or no copy
# depending on whether we're in a 'parallel' statement
var b = args.b
var fv = args.fv
args.prom = nimCreatePromise(thread, sizeof(T)) # optional
nimPromiseCreateCondVar(args.prom) # optional
fv.owner = thread # optional
nimArgsPassingDone() # signal parent that the work is done
#
args.prom.blob = f(a, b, ...)
nimPromiseSignal(args.prom)
args.fv.blob = f(a, b, ...)
nimFlowVarSignal(args.fv)
# - or -
f(a, b, ...)
@@ -192,23 +198,12 @@ stmtList:
scratchObj.b = b
nimSpawn(f_wrapper, addr scratchObj)
scratchObj.prom # optional
scratchObj.fv # optional
"""
proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode =
let size = newNodeIT(nkCall, prom.info, getSysType(tyInt))
size.add newSymNode(createMagic("sizeof", mSizeOf))
assert prom.typ.kind == tyGenericInst
size.add newNodeIT(nkType, prom.info, prom.typ.sons[1])
let castExpr = newNodeIT(nkCast, prom.info, prom.typ)
castExpr.add emptyNode
castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size)
result = castExpr
proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
varSection, call, barrier, prom: PNode;
varSection, call, barrier, fv: PNode;
spawnKind: TSpawnResult): PSym =
var body = newNodeI(nkStmtList, f.info)
var threadLocalBarrier: PSym
@@ -220,32 +215,32 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
body.add callCodeGenProc("barrierEnter", threadLocalBarrier.newSymNode)
var threadLocalProm: PSym
if spawnKind == srByVar:
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom)
elif prom != nil:
internalAssert prom.typ.kind == tyGenericInst
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ,
createNimCreatePromiseCall(prom, threadParam.newSymNode))
threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv)
elif fv != nil:
internalAssert fv.typ.kind == tyGenericInst
threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv)
body.add varSection
if prom != nil and spawnKind != srByVar:
body.add newFastAsgnStmt(prom, threadLocalProm.newSymNode)
if barrier == nil:
body.add callCodeGenProc("nimPromiseCreateCondVar", prom)
if fv != nil and spawnKind != srByVar:
# generate:
# fv.owner = threadParam
body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
"owner", fv.info), threadParam.newSymNode)
body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode)
if spawnKind == srByVar:
body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
elif prom != nil:
let fk = prom.typ.sons[1].promiseKind
if fk == promInvalid:
localError(f.info, "cannot create a promise of type: " &
typeToString(prom.typ.sons[1]))
elif fv != nil:
let fk = fv.typ.sons[1].flowVarKind
if fk == fvInvalid:
localError(f.info, "cannot create a flowVar of type: " &
typeToString(fv.typ.sons[1]))
body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
if fk == promGC: "data" else: "blob", prom.info), call)
if fk == fvGC: "data" else: "blob", fv.info), call)
if barrier == nil:
# by now 'prom' is shared and thus might have beeen overwritten! we need
# by now 'fv' is shared and thus might have beeen overwritten! we need
# to use the thread-local view instead:
body.add callCodeGenProc("nimPromiseSignal", threadLocalProm.newSymNode)
body.add callCodeGenProc("nimFlowVarSignal", threadLocalProm.newSymNode)
else:
body.add call
if barrier != nil:
@@ -404,22 +399,23 @@ proc setupArgsForParallelism(n: PNode; objType: PType; scratchObj: PSym;
indirectAccess(castExpr, field, n.info))
call.add(threadLocal.newSymNode)
proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType;
proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
barrier, dest: PNode = nil): PNode =
# if 'barrier' != nil, then it is in a 'parallel' section and we
# generate quite different code
let n = spawnExpr[1]
let spawnKind = spawnResult(retType, barrier!=nil)
case spawnKind
of srVoid:
internalAssert dest == nil
result = newNodeI(nkStmtList, n.info)
of srPromise:
of srFlowVar:
internalAssert dest == nil
result = newNodeIT(nkStmtListExpr, n.info, retType)
of srByVar:
if dest == nil: localError(n.info, "'spawn' must not be discarded")
result = newNodeI(nkStmtList, n.info)
if n.kind notin nkCallKinds:
localError(n.info, "'spawn' takes a call expression")
return
@@ -482,24 +478,29 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType;
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier)
barrierAsExpr = indirectAccess(castExpr, field, n.info)
var promField, promAsExpr: PNode = nil
if spawnKind == srPromise:
var field = newSym(skField, getIdent"prom", owner, n.info)
var fvField, fvAsExpr: PNode = nil
if spawnKind == srFlowVar:
var field = newSym(skField, getIdent"fv", owner, n.info)
field.typ = retType
objType.addField(field)
promField = newDotExpr(scratchObj, field)
promAsExpr = indirectAccess(castExpr, field, n.info)
fvField = newDotExpr(scratchObj, field)
fvAsExpr = indirectAccess(castExpr, field, n.info)
# create flowVar:
result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2]))
if barrier == nil:
result.add callCodeGenProc("nimFlowVarCreateCondVar", fvField)
elif spawnKind == srByVar:
var field = newSym(skField, getIdent"prom", owner, n.info)
var field = newSym(skField, getIdent"fv", owner, n.info)
field.typ = newType(tyPtr, objType.owner)
field.typ.rawAddSon(retType)
objType.addField(field)
promAsExpr = indirectAccess(castExpr, field, n.info)
fvAsExpr = indirectAccess(castExpr, field, n.info)
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))
let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, call,
barrierAsExpr, promAsExpr, spawnKind)
barrierAsExpr, fvAsExpr, spawnKind)
result.add callCodeGenProc("nimSpawn", wrapper.newSymNode,
genAddrOf(scratchObj.newSymNode))
if spawnKind == srPromise: result.add promField
if spawnKind == srFlowVar: result.add fvField

View File

@@ -644,12 +644,13 @@ proc singlePragma(c: PContext, sym: PSym, n: PNode, i: int,
incl(sym.flags, sfNoReturn)
of wDynlib:
processDynLib(c, it, sym)
of wCompilerproc:
of wCompilerproc:
noVal(it) # compilerproc may not get a string!
makeExternExport(sym, "$1", it.info)
incl(sym.flags, sfCompilerProc)
incl(sym.flags, sfUsed) # suppress all those stupid warnings
registerCompilerProc(sym)
if sfFromGeneric notin sym.flags:
makeExternExport(sym, "$1", it.info)
incl(sym.flags, sfCompilerProc)
incl(sym.flags, sfUsed) # suppress all those stupid warnings
registerCompilerProc(sym)
of wProcVar:
noVal(it)
incl(sym.flags, sfProcvar)

View File

@@ -1585,12 +1585,22 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode =
else:
result = semDirectOp(c, n, flags)
proc createPromise(c: PContext; t: PType; info: TLineInfo): PType =
proc createFlowVar(c: PContext; t: PType; info: TLineInfo): PType =
result = newType(tyGenericInvokation, c.module)
addSonSkipIntLit(result, magicsys.getCompilerProc("Promise").typ)
addSonSkipIntLit(result, magicsys.getCompilerProc("FlowVar").typ)
addSonSkipIntLit(result, t)
result = instGenericContainer(c, info, result, allowMetaTypes = false)
proc instantiateCreateFlowVarCall(c: PContext; t: PType;
info: TLineInfo): PSym =
let sym = magicsys.getCompilerProc("nimCreateFlowVar")
if sym == nil:
localError(info, errSystemNeeds, "nimCreateFlowVar")
var bindings: TIdTable
initIdTable(bindings)
bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t)
result = c.semGenerateInstance(c, sym, bindings, info)
proc setMs(n: PNode, s: PSym): PNode =
result = n
n.sons[0] = newSymNode(s)
@@ -1631,7 +1641,8 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
if c.inParallelStmt > 0:
result.typ = result[1].typ
else:
result.typ = createPromise(c, result[1].typ, n.info)
result.typ = createFlowVar(c, result[1].typ, n.info)
result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode
else: result = semDirectOp(c, n, flags)
proc semWhen(c: PContext, n: PNode, semCheck = true): PNode =

View File

@@ -406,19 +406,19 @@ proc transformSpawn(owner: PSym; n, barrier: PNode): PNode =
if result.isNil:
result = newNodeI(nkStmtList, n.info)
result.add n
result.add wrapProcForSpawn(owner, m[1], b.typ, barrier, it[0])
result.add wrapProcForSpawn(owner, m, b.typ, barrier, it[0])
it.sons[it.len-1] = emptyNode
if result.isNil: result = n
of nkAsgn, nkFastAsgn:
let b = n[1]
if getMagic(b) == mSpawn:
let m = transformSlices(b)
return wrapProcForSpawn(owner, m[1], b.typ, barrier, n[0])
return wrapProcForSpawn(owner, m, b.typ, barrier, n[0])
result = transformSpawnSons(owner, n, barrier)
of nkCallKinds:
if getMagic(n) == mSpawn:
result = transformSlices(n)
return wrapProcForSpawn(owner, result[1], n.typ, barrier, nil)
return wrapProcForSpawn(owner, result, n.typ, barrier, nil)
result = transformSpawnSons(owner, n, barrier)
elif n.safeLen > 0:
result = transformSpawnSons(owner, n, barrier)

57
doc/spawn.txt Normal file
View File

@@ -0,0 +1,57 @@
==========================================================
Parallel & Spawn
==========================================================
Nimrod has two flavors of parallelism:
1) `Structured`:idx parallelism via the ``parallel`` statement.
2) `Unstructured`:idx: parallelism via the standalone ``spawn`` statement.
Somewhat confusingly, ``spawn`` is also used in the ``parallel`` statement
with slightly different semantics. ``spawn`` always takes a call expression of
the form ``f(a, ...)``. Let ``T`` be ``f``'s return type. If ``T`` is ``void``
then ``spawn``'s return type is also ``void``. Within a ``parallel`` section
``spawn``'s return type is ``T``, otherwise it is ``FlowVar[T]``.
The compiler can ensure the location in ``location = spawn f(...)`` is not
read prematurely within a ``parallel`` section and so there is no need for
the overhead of an indirection via ``FlowVar[T]`` to ensure correctness.
Parallel statement
==================
The parallel statement is the preferred mechanism to introduce parallelism
in a Nimrod program. A subset of the Nimrod language is valid within a
``parallel`` section. This subset is checked to be free of data races at
compile time. A sophisticated `disjoint checker`:idx: ensures that no data
races are possible even though shared memory is extensively supported!
The subset is in fact the full language with the following
restrictions / changes:
* ``spawn`` within a ``parallel`` section has special semantics.
* Every location of the form ``a[i]`` and ``a[i..j]`` and ``dest`` where
``dest`` is part of the pattern ``dest = spawn f(...)`` has to be
provable disjoint. This is called the *disjoint check*.
* Every other complex location ``loc`` that is used in a spawned
proc (``spawn f(loc)``) has to immutable for the duration of
the ``parallel``. This is called the *immutability check*. Currently it
is not specified what exactly "complex location" means. We need to make that
an optimization!
* Every array access has to be provable within bounds.
* Slices are optimized so that no copy is performed. This optimization is not
yet performed for ordinary slices outside of a ``parallel`` section.
Spawn statement
===============
A standalone ``spawn`` statement is a simple construct. It executes
the passed expression on the thread pool and returns a `data flow variable`:idx:
``FlowVar[T]`` that can be read from. The reading with the ``^`` operator is
**blocking**. However, one can use ``awaitAny`` to wait on multiple flow variables
at the same time.
Like the ``parallel`` statement data flow variables ensure that no data races
are possible.

View File

@@ -40,33 +40,44 @@ proc signal(cv: var CondVar) =
release(cv.L)
signal(cv.c)
const CacheLineSize = 32 # true for most archs
type
Barrier* {.compilerProc.} = object
Barrier {.compilerProc.} = object
entered: int
cv: CondVar
cacheAlign: array[0..20, byte] # ensure 'left' is not on the same
# cache line as 'entered'
cv: CondVar # condvar takes 3 words at least
when sizeof(int) < 8:
cacheAlign: array[CacheLineSize-4*sizeof(int), byte]
left: int
cacheAlign2: array[CacheLineSize-sizeof(int), byte]
interest: bool ## wether the master is interested in the "all done" event
proc barrierEnter*(b: ptr Barrier) {.compilerProc.} =
atomicInc b.entered
proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} =
# due to the signaling between threads, it is ensured we are the only
# one with access to 'entered' so we don't need 'atomicInc' here:
inc b.entered
# also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
# will be called which already will perform a fence for us.
proc barrierLeave*(b: ptr Barrier) {.compilerProc.} =
proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} =
atomicInc b.left
# these can only be equal if 'closeBarrier' already signaled its interest
# in this event:
if b.left == b.entered: signal(b.cv)
when not defined(x86): fence()
if b.interest and b.left == b.entered: signal(b.cv)
proc openBarrier*(b: ptr Barrier) {.compilerProc.} =
proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} =
b.entered = 0
b.cv = createCondVar()
b.left = -1
b.left = 0
b.interest = false
proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
# signal interest in the "all done" event:
atomicInc b.left
while b.left != b.entered: await(b.cv)
destroyCondVar(b.cv)
proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
fence()
if b.left != b.entered:
b.cv = createCondVar()
fence()
b.interest = true
fence()
while b.left != b.entered: await(b.cv)
destroyCondVar(b.cv)
{.pop.}
@@ -79,25 +90,26 @@ type
cv: CondVar
idx: int
RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]'
RawPromiseObj {.inheritable.} = object # \
# we allocate this with the thread local allocator; this
# is possible since we already need to do the GC_unref
# on the owning thread
RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]'
RawFlowVarObj = object of TObject
ready, usesCondVar: bool
cv: CondVar #\
# for 'awaitAny' support
ai: ptr AwaitInfo
idx: int
data: PObject # we incRef and unref it to keep it alive
owner: ptr Worker
next: RawPromise
align: float64 # a float for proper alignment
data: pointer # we incRef and unref it to keep it alive
owner: pointer # ptr Worker
Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj
blob: T ## the underlying value, if available. Note that usually
## you should not access this field directly! However it can
## sometimes be more efficient than getting the value via ``^``.
FlowVarObj[T] = object of RawFlowVarObj
blob: T
FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable
ToFreeQueue = object
len: int
lock: TLock
empty: TCond
data: array[512, pointer]
WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
Worker = object
@@ -109,109 +121,117 @@ type
ready: bool # put it here for correct alignment!
initialized: bool # whether it has even been initialized
shutdown: bool # the pool requests to shut down this worker thread
promiseLock: TLock
head: RawPromise
q: ToFreeQueue
proc finished*(prom: RawPromise) =
## This MUST be called for every created promise to free its associated
## resources. Note that the default reading operation ``^`` is destructive
## and calls ``finished``.
doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'"
assert prom.next == nil
let w = prom.owner
acquire(w.promiseLock)
prom.next = w.head
w.head = prom
release(w.promiseLock)
proc await*(fv: RawFlowVar) =
## waits until the value for the flowVar arrives. Usually it is not necessary
## to call this explicitly.
if fv.usesCondVar:
fv.usesCondVar = false
await(fv.cv)
destroyCondVar(fv.cv)
proc cleanPromises(w: ptr Worker) =
var it = w.head
acquire(w.promiseLock)
while it != nil:
let nxt = it.next
if it.usesCondVar: destroyCondVar(it.cv)
if it.data != nil: GC_unref(it.data)
dealloc(it)
it = nxt
w.head = nil
release(w.promiseLock)
proc finished(fv: RawFlowVar) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
# we have to protect against the rare cases where the owner of the flowVar
# simply disregards the flowVar and yet the "flowVarr" has not yet written
# anything to it:
await(fv)
if fv.data.isNil: return
let owner = cast[ptr Worker](fv.owner)
let q = addr(owner.q)
var waited = false
while true:
acquire(q.lock)
if q.len < q.data.len:
q.data[q.len] = fv.data
inc q.len
release(q.lock)
break
else:
# the queue is exhausted! We block until it has been cleaned:
release(q.lock)
wait(q.empty, q.lock)
waited = true
fv.data = nil
# wakeup other potentially waiting threads:
if waited: signal(q.empty)
proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {.
compilerProc.} =
result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize))
result.owner = cast[ptr Worker](owner)
proc cleanFlowVars(w: ptr Worker) =
let q = addr(w.q)
acquire(q.lock)
for i in 0 .. <q.len:
GC_unref(cast[PObject](q.data[i]))
q.len = 0
release(q.lock)
signal(q.empty)
proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} =
prom.cv = createCondVar()
prom.usesCondVar = true
proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} =
if prom.ai != nil:
acquire(prom.ai.cv.L)
prom.ai.idx = prom.idx
inc prom.ai.cv.counter
release(prom.ai.cv.L)
signal(prom.ai.cv.c)
if prom.usesCondVar: signal(prom.cv)
proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
new(result, fvFinalizer)
proc await*[T](prom: Promise[T]) =
## waits until the value for the promise arrives.
if prom.usesCondVar: await(prom.cv)
proc nimFlowVarCreateCondVar(fv: RawFlowVar) {.compilerProc.} =
fv.cv = createCondVar()
fv.usesCondVar = true
proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
## blocks until the ``prom`` is available and then passes its value
proc nimFlowVarSignal(fv: RawFlowVar) {.compilerProc.} =
if fv.ai != nil:
acquire(fv.ai.cv.L)
fv.ai.idx = fv.idx
inc fv.ai.cv.counter
release(fv.ai.cv.L)
signal(fv.ai.cv.c)
if fv.usesCondVar: signal(fv.cv)
proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
## blocks until the ``fv`` is available and then passes its value
## to ``action``. Note that due to Nimrod's parameter passing semantics this
## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
## sometimes be more efficient than ``^``.
if prom.usesCondVar: await(prom)
await(fv)
when T is string or T is seq:
action(cast[T](prom.data))
action(cast[T](fv.data))
elif T is ref:
{.error: "'awaitAndThen' not available for Promise[ref]".}
{.error: "'awaitAndThen' not available for FlowVar[ref]".}
else:
action(prom.blob)
finished(prom)
action(fv.blob)
finished(fv)
proc `^`*[T](prom: Promise[ref T]): foreign ptr T =
## blocks until the value is available and then returns this value. Note
## this reading is destructive for reasons of efficiency and convenience.
## This calls ``finished(prom)``.
if prom.usesCondVar: await(prom)
result = cast[foreign ptr T](prom.data)
finished(prom)
proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T =
## blocks until the value is available and then returns this value.
await(fv)
result = cast[foreign ptr T](fv.data)
proc `^`*[T](prom: Promise[T]): T =
## blocks until the value is available and then returns this value. Note
## this reading is destructive for reasons of efficiency and convenience.
## This calls ``finished(prom)``.
if prom.usesCondVar: await(prom)
proc `^`*[T](fv: FlowVar[T]): T =
## blocks until the value is available and then returns this value.
await(fv)
when T is string or T is seq:
result = cast[T](prom.data)
result = cast[T](fv.data)
else:
result = prom.blob
finished(prom)
result = fv.blob
proc awaitAny*(promises: openArray[RawPromise]): int =
# awaits any of the given promises. Returns the index of one promise for which
## a value arrived. A promise only supports one call to 'awaitAny' at the
## same time. That means if you await([a,b]) and await([b,c]) the second
## call will only await 'c'. If there is no promise left to be able to wait
proc awaitAny*(flowVars: openArray[RawFlowVar]): int =
## awaits any of the given flowVars. Returns the index of one flowVar for
## which a value arrived. A flowVar only supports one call to 'awaitAny' at
## the same time. That means if you await([a,b]) and await([b,c]) the second
## call will only await 'c'. If there is no flowVar left to be able to wait
## on, -1 is returned.
## **Note**: This results in non-deterministic behaviour and so should be
## avoided.
var ai: AwaitInfo
ai.cv = createCondVar()
var conflicts = 0
for i in 0 .. promises.high:
if cas(addr promises[i].ai, nil, addr ai):
promises[i].idx = i
for i in 0 .. flowVars.high:
if cas(addr flowVars[i].ai, nil, addr ai):
flowVars[i].idx = i
else:
inc conflicts
if conflicts < promises.len:
if conflicts < flowVars.len:
await(ai.cv)
result = ai.idx
for i in 0 .. promises.high:
discard cas(addr promises[i].ai, addr ai, nil)
for i in 0 .. flowVars.high:
discard cas(addr flowVars[i].ai, addr ai, nil)
else:
result = -1
destroyCondVar(ai.cv)
@@ -239,7 +259,7 @@ proc slave(w: ptr Worker) {.thread.} =
await(w.taskArrived)
assert(not w.ready)
w.f(w, w.data)
if w.head != nil: w.cleanPromises
if w.q.len != 0: w.cleanFlowVars
if w.shutdown:
w.shutdown = false
atomicDec currentPoolSize
@@ -260,8 +280,9 @@ var
proc activateThread(i: int) {.noinline.} =
workersData[i].taskArrived = createCondVar()
workersData[i].taskStarted = createCondVar()
initLock workersData[i].promiseLock
workersData[i].initialized = true
initCond(workersData[i].q.empty)
initLock(workersData[i].q.lock)
createThread(workers[i], slave, addr(workersData[i]))
proc setup() =
@@ -278,14 +299,16 @@ proc preferSpawn*(): bool =
proc spawn*(call: expr): expr {.magic: "Spawn".}
## always spawns a new task, so that the 'call' is never executed on
## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
## is gcsafe and has 'void' as the return type.
## is gcsafe and has a return type that is either 'void' or compatible
## with ``FlowVar[T]``.
template spawnX*(call: expr): expr =
## spawns a new task if a CPU core is ready, otherwise executes the
## call in the calling thread. Usually it is advised to
## use 'spawn' in order to not block the producer for an unknown
## amount of time. 'call' has to be proc call 'p(...)' where 'p'
## is gcsafe and has 'void' as the return type.
## is gcsafe and has a return type that is either 'void' or compatible
## with ``FlowVar[T]``.
(if preferSpawn(): spawn call else: call)
proc parallel*(body: stmt) {.magic: "Parallel".}

View File

@@ -10,7 +10,9 @@
## Atomic operations for Nimrod.
{.push stackTrace:off.}
when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
const someGcc = defined(gcc) or defined(llvm_gcc) or defined(clang)
when someGcc and hasThreadSupport:
type
AtomMemModel* = enum
ATOMIC_RELAXED, ## No barriers or synchronization.
@@ -153,41 +155,16 @@ when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
## A value of 0 indicates typical alignment should be used. The compiler may also
## ignore this parameter.
template fence*() = atomicThreadFence(ATOMIC_SEQ_CST)
elif defined(vcc) and hasThreadSupport:
proc addAndFetch*(p: ptr int, val: int): int {.
importc: "NimXadd", nodecl.}
else:
proc addAndFetch*(p: ptr int, val: int): int {.inline.} =
inc(p[], val)
result = p[]
# atomic compare and swap (CAS) funcitons to implement lock-free algorithms
#if defined(windows) and not defined(gcc) and hasThreadSupport:
# proc InterlockedCompareExchangePointer(mem: ptr pointer,
# newValue: pointer, comparand: pointer) : pointer {.nodecl,
# importc: "InterlockedCompareExchangePointer", header:"windows.h".}
# proc compareAndSwap*[T](mem: ptr T,
# expected: T, newValue: T): bool {.inline.}=
# ## Returns true if successfully set value at mem to newValue when value
# ## at mem == expected
# return InterlockedCompareExchangePointer(addr(mem),
# addr(newValue), addr(expected))[] == expected
#elif not hasThreadSupport:
# proc compareAndSwap*[T](mem: ptr T,
# expected: T, newValue: T): bool {.inline.} =
# ## Returns true if successfully set value at mem to newValue when value
# ## at mem == expected
# var oldval = mem[]
# if oldval == expected:
# mem[] = newValue
# return true
# return false
# Some convenient functions
proc atomicInc*(memLoc: var int, x: int = 1): int =
when defined(gcc) and hasThreadSupport:
result = atomic_add_fetch(memLoc.addr, x, ATOMIC_RELAXED)
@@ -205,7 +182,7 @@ proc atomicDec*(memLoc: var int, x: int = 1): int =
dec(memLoc, x)
result = memLoc
when defined(windows) and not defined(gcc):
when defined(windows) and not someGcc:
proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32
{.importc: "InterlockedCompareExchange", header: "<windows.h>", cdecl.}
@@ -219,7 +196,7 @@ else:
# XXX is this valid for 'int'?
when (defined(x86) or defined(amd64)) and defined(gcc):
when (defined(x86) or defined(amd64)) and (defined(gcc) or defined(llvm_gcc)):
proc cpuRelax {.inline.} =
{.emit: """asm volatile("pause" ::: "memory");""".}
elif (defined(x86) or defined(amd64)) and defined(vcc):
@@ -231,4 +208,10 @@ elif false:
proc cpuRelax {.inline.} = os.sleep(1)
when not defined(fence) and hasThreadSupport:
# XXX fixme
proc fence*() {.inline.} =
var dummy: bool
discard cas(addr dummy, false, true)
{.pop.}

View File

@@ -1,6 +1,23 @@
version 0.9.6
=============
Concurrency
-----------
- document the new 'spawn' and 'parallel' statements
- implement 'deepCopy' builtin
- implement 'foo[1..4] = spawn(f[4..7])'
- the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()'
- support for exception propagation
- Minor: The copying of the 'ref Promise' into the thead local storage only
happens to work due to the write barrier's implementation
- 'gcsafe' inferrence needs to be fixed
- implement lock levels --> first without the more complex race avoidance
Misc
----
- fix the bug that keeps 'defer' template from working
- make '--implicitStatic:on' the default
- fix the tuple unpacking in lambda bug