From a4323b06b321b77ea36bf738efdfa481faf9822c Mon Sep 17 00:00:00 2001 From: Araq Date: Tue, 3 Jun 2014 08:25:42 +0200 Subject: [PATCH 1/5] barrier more efficient --- lib/pure/concurrency/threadpool.nim | 42 ++++++++++++++++------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 22f00bc0d8..92d5011f4f 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -40,33 +40,39 @@ proc signal(cv: var CondVar) = release(cv.L) signal(cv.c) +const CacheLineSize = 32 # true for most archs + type - Barrier* {.compilerProc.} = object + Barrier {.compilerProc.} = object entered: int - cv: CondVar - cacheAlign: array[0..20, byte] # ensure 'left' is not on the same - # cache line as 'entered' + cv: CondVar # condvar takes 3 words at least + when sizeof(int) < 8: + cacheAlign: array[CacheLineSize-4*sizeof(int), byte] left: int + cacheAlign2: array[CacheLineSize-sizeof(int), byte] + interest: bool ## wether the master is interested in the "all done" event -proc barrierEnter*(b: ptr Barrier) {.compilerProc.} = - atomicInc b.entered +proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = + ## due to the signaling between threads, it is ensured we are the only + ## one with access to 'entered' so we don't need 'atomicInc' here: + inc b.entered -proc barrierLeave*(b: ptr Barrier) {.compilerProc.} = +proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = atomicInc b.left - # these can only be equal if 'closeBarrier' already signaled its interest - # in this event: - if b.left == b.entered: signal(b.cv) + if b.interest and b.left == b.entered: signal(b.cv) -proc openBarrier*(b: ptr Barrier) {.compilerProc.} = +proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = b.entered = 0 - b.cv = createCondVar() - b.left = -1 + b.left = 0 + b.interest = false -proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = - # signal interest in the "all done" event: - atomicInc b.left - while b.left != b.entered: await(b.cv) - destroyCondVar(b.cv) +proc closeBarrier(b: ptr Barrier) {.compilerProc.} = + if b.left != b.entered: + b.cv = createCondVar() + b.interest = true # XXX we really need to ensure no re-orderings are done + # by the C compiler here + while b.left != b.entered: await(b.cv) + destroyCondVar(b.cv) {.pop.} From 2de99653d002b919c88322219bff6f33653081c5 Mon Sep 17 00:00:00 2001 From: Araq Date: Thu, 5 Jun 2014 08:46:29 +0200 Subject: [PATCH 2/5] Promises are now refs --- compiler/ccgexprs.nim | 2 +- compiler/lowerings.nim | 45 +++++----- compiler/pragmas.nim | 11 +-- compiler/semexprs.nim | 11 +++ compiler/semparallel.nim | 6 +- lib/pure/concurrency/threadpool.nim | 122 +++++++++++++++------------- lib/system/atomics.nim | 8 ++ todo.txt | 14 ++++ 8 files changed, 132 insertions(+), 87 deletions(-) diff --git a/compiler/ccgexprs.nim b/compiler/ccgexprs.nim index 34fdf5bf17..c0442711ed 100644 --- a/compiler/ccgexprs.nim +++ b/compiler/ccgexprs.nim @@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) = of mSlurp..mQuoteAst: localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s) of mSpawn: - let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil) + let n = lowerings.wrapProcForSpawn(p.module.module, e, e.typ, nil, nil) expr(p, n, d) of mParallel: let n = semparallel.liftParallel(p.module.module, e) diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index af4daf7857..327a18df5b 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -86,7 +86,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = # returns a[].b as a node var deref = newNodeI(nkHiddenDeref, info) deref.typ = a.typ.skipTypes(abstractInst).sons[0] - var t = deref.typ + var t = deref.typ.skipTypes(abstractInst) var field: PSym while true: assert t.kind == tyObject @@ -94,6 +94,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = if field != nil: break t = t.sons[0] if t == nil: break + t = t.skipTypes(abstractInst) assert field != nil, b addSon(deref, a) result = newNodeI(nkDotExpr, info) @@ -132,6 +133,11 @@ proc callCodegenProc*(name: string, arg1: PNode; if arg3 != nil: result.add arg3 result.typ = sym.typ.sons[0] +proc callProc(a: PNode): PNode = + result = newNodeI(nkCall, a.info) + result.add a + result.typ = a.typ.sons[0] + # we have 4 cases to consider: # - a void proc --> nothing to do # - a proc returning GC'ed memory --> requires a promise @@ -169,14 +175,14 @@ proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym = discard """ We generate roughly this: -proc f_wrapper(args) = +proc f_wrapper(thread, args) = barrierEnter(args.barrier) # for parallel statement var a = args.a # thread transfer; deepCopy or shallowCopy or no copy # depending on whether we're in a 'parallel' statement var b = args.b + var prom = args.prom - args.prom = nimCreatePromise(thread, sizeof(T)) # optional - nimPromiseCreateCondVar(args.prom) # optional + prom.owner = thread # optional nimArgsPassingDone() # signal parent that the work is done # args.prom.blob = f(a, b, ...) @@ -196,17 +202,6 @@ stmtList: """ -proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode = - let size = newNodeIT(nkCall, prom.info, getSysType(tyInt)) - size.add newSymNode(createMagic("sizeof", mSizeOf)) - assert prom.typ.kind == tyGenericInst - size.add newNodeIT(nkType, prom.info, prom.typ.sons[1]) - - let castExpr = newNodeIT(nkCast, prom.info, prom.typ) - castExpr.add emptyNode - castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size) - result = castExpr - proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; varSection, call, barrier, prom: PNode; spawnKind: TSpawnResult): PSym = @@ -223,14 +218,14 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) elif prom != nil: internalAssert prom.typ.kind == tyGenericInst - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, - createNimCreatePromiseCall(prom, threadParam.newSymNode)) + threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) body.add varSection if prom != nil and spawnKind != srByVar: - body.add newFastAsgnStmt(prom, threadLocalProm.newSymNode) - if barrier == nil: - body.add callCodeGenProc("nimPromiseCreateCondVar", prom) + # generate: + # prom.owner = threadParam + body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, + "owner", prom.info), threadParam.newSymNode) body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) if spawnKind == srByVar: @@ -404,10 +399,11 @@ proc setupArgsForParallelism(n: PNode; objType: PType; scratchObj: PSym; indirectAccess(castExpr, field, n.info)) call.add(threadLocal.newSymNode) -proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; +proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; barrier, dest: PNode = nil): PNode = # if 'barrier' != nil, then it is in a 'parallel' section and we # generate quite different code + let n = spawnExpr[1] let spawnKind = spawnResult(retType, barrier!=nil) case spawnKind of srVoid: @@ -419,7 +415,7 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; of srByVar: if dest == nil: localError(n.info, "'spawn' must not be discarded") result = newNodeI(nkStmtList, n.info) - + if n.kind notin nkCallKinds: localError(n.info, "'spawn' takes a call expression") return @@ -489,6 +485,11 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; objType.addField(field) promField = newDotExpr(scratchObj, field) promAsExpr = indirectAccess(castExpr, field, n.info) + # create promise: + result.add newFastAsgnStmt(promField, callProc(spawnExpr[2])) + if barrier == nil: + result.add callCodeGenProc("nimPromiseCreateCondVar", promField) + elif spawnKind == srByVar: var field = newSym(skField, getIdent"prom", owner, n.info) field.typ = newType(tyPtr, objType.owner) diff --git a/compiler/pragmas.nim b/compiler/pragmas.nim index db9fe7cbef..aed0e18504 100644 --- a/compiler/pragmas.nim +++ b/compiler/pragmas.nim @@ -644,12 +644,13 @@ proc singlePragma(c: PContext, sym: PSym, n: PNode, i: int, incl(sym.flags, sfNoReturn) of wDynlib: processDynLib(c, it, sym) - of wCompilerproc: + of wCompilerproc: noVal(it) # compilerproc may not get a string! - makeExternExport(sym, "$1", it.info) - incl(sym.flags, sfCompilerProc) - incl(sym.flags, sfUsed) # suppress all those stupid warnings - registerCompilerProc(sym) + if sfFromGeneric notin sym.flags: + makeExternExport(sym, "$1", it.info) + incl(sym.flags, sfCompilerProc) + incl(sym.flags, sfUsed) # suppress all those stupid warnings + registerCompilerProc(sym) of wProcVar: noVal(it) incl(sym.flags, sfProcvar) diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index e507e711f3..9e3785185b 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1585,6 +1585,16 @@ proc createPromise(c: PContext; t: PType; info: TLineInfo): PType = addSonSkipIntLit(result, t) result = instGenericContainer(c, info, result, allowMetaTypes = false) +proc instantiateCreatePromiseCall(c: PContext; t: PType; + info: TLineInfo): PSym = + let sym = magicsys.getCompilerProc("nimCreatePromise") + if sym == nil: + localError(info, errSystemNeeds, "nimCreatePromise") + var bindings: TIdTable + initIdTable(bindings) + bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t) + result = c.semGenerateInstance(c, sym, bindings, info) + proc setMs(n: PNode, s: PSym): PNode = result = n n.sons[0] = newSymNode(s) @@ -1626,6 +1636,7 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = result.typ = result[1].typ else: result.typ = createPromise(c, result[1].typ, n.info) + result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode else: result = semDirectOp(c, n, flags) proc semWhen(c: PContext, n: PNode, semCheck = true): PNode = diff --git a/compiler/semparallel.nim b/compiler/semparallel.nim index 72def1137b..c594a47883 100644 --- a/compiler/semparallel.nim +++ b/compiler/semparallel.nim @@ -406,19 +406,19 @@ proc transformSpawn(owner: PSym; n, barrier: PNode): PNode = if result.isNil: result = newNodeI(nkStmtList, n.info) result.add n - result.add wrapProcForSpawn(owner, m[1], b.typ, barrier, it[0]) + result.add wrapProcForSpawn(owner, m, b.typ, barrier, it[0]) it.sons[it.len-1] = emptyNode if result.isNil: result = n of nkAsgn, nkFastAsgn: let b = n[1] if getMagic(b) == mSpawn: let m = transformSlices(b) - return wrapProcForSpawn(owner, m[1], b.typ, barrier, n[0]) + return wrapProcForSpawn(owner, m, b.typ, barrier, n[0]) result = transformSpawnSons(owner, n, barrier) of nkCallKinds: if getMagic(n) == mSpawn: result = transformSlices(n) - return wrapProcForSpawn(owner, result[1], n.typ, barrier, nil) + return wrapProcForSpawn(owner, result, n.typ, barrier, nil) result = transformSpawnSons(owner, n, barrier) elif n.safeLen > 0: result = transformSpawnSons(owner, n, barrier) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 92d5011f4f..8129d03ae9 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -85,25 +85,26 @@ type cv: CondVar idx: int - RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj {.inheritable.} = object # \ - # we allocate this with the thread local allocator; this - # is possible since we already need to do the GC_unref - # on the owning thread + RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' + RawPromiseObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: PObject # we incRef and unref it to keep it alive - owner: ptr Worker - next: RawPromise - align: float64 # a float for proper alignment + data: pointer # we incRef and unref it to keep it alive + owner: pointer # ptr Worker - Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj - blob: T ## the underlying value, if available. Note that usually - ## you should not access this field directly! However it can - ## sometimes be more efficient than getting the value via ``^``. + PromiseObj[T] = object of RawPromiseObj + blob: T + + Promise*{.compilerProc.}[T] = ref PromiseObj[T] + + ToFreeQueue = object + len: int + lock: TLock + empty: TCond + data: array[512, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -115,37 +116,55 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread - promiseLock: TLock - head: RawPromise + q: ToFreeQueue -proc finished*(prom: RawPromise) = - ## This MUST be called for every created promise to free its associated - ## resources. Note that the default reading operation ``^`` is destructive - ## and calls ``finished``. +proc await*(prom: RawPromise) = + ## waits until the value for the promise arrives. Usually it is not necessary + ## to call this explicitly. + if prom.usesCondVar: + prom.usesCondVar = false + await(prom.cv) + destroyCondVar(prom.cv) + +proc finished(prom: RawPromise) = doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - assert prom.next == nil - let w = prom.owner - acquire(w.promiseLock) - prom.next = w.head - w.head = prom - release(w.promiseLock) + # we have to protect against the rare cases where the owner of the promise + # simply disregards the promise and yet the "promiser" has not yet written + # anything to it: + await(prom) + if prom.data.isNil: return + let owner = cast[ptr Worker](prom.owner) + let q = addr(owner.q) + var waited = false + while true: + acquire(q.lock) + if q.len < q.data.len: + q.data[q.len] = prom.data + inc q.len + release(q.lock) + break + else: + # the queue is exhausted! We block until it has been cleaned: + release(q.lock) + wait(q.empty, q.lock) + waited = true + prom.data = nil + # wakeup other potentially waiting threads: + if waited: signal(q.empty) proc cleanPromises(w: ptr Worker) = - var it = w.head - acquire(w.promiseLock) - while it != nil: - let nxt = it.next - if it.usesCondVar: destroyCondVar(it.cv) - if it.data != nil: GC_unref(it.data) - dealloc(it) - it = nxt - w.head = nil - release(w.promiseLock) + let q = addr(w.q) + acquire(q.lock) + for i in 0 .. Date: Fri, 6 Jun 2014 02:05:17 +0200 Subject: [PATCH 3/5] added 'fence' instructions to the barrier --- lib/pure/concurrency/threadpool.nim | 25 +++++++++++++-------- lib/system/atomics.nim | 35 +++++------------------------ todo.txt | 3 +++ 3 files changed, 24 insertions(+), 39 deletions(-) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 8129d03ae9..c4ed42c058 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -53,12 +53,15 @@ type interest: bool ## wether the master is interested in the "all done" event proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = - ## due to the signaling between threads, it is ensured we are the only - ## one with access to 'entered' so we don't need 'atomicInc' here: + # due to the signaling between threads, it is ensured we are the only + # one with access to 'entered' so we don't need 'atomicInc' here: inc b.entered + # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' + # will be called which already will perform a fence for us. proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = atomicInc b.left + when not defined(x86): fence() if b.interest and b.left == b.entered: signal(b.cv) proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = @@ -67,10 +70,12 @@ proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = b.interest = false proc closeBarrier(b: ptr Barrier) {.compilerProc.} = + fence() if b.left != b.entered: b.cv = createCondVar() - b.interest = true # XXX we really need to ensure no re-orderings are done - # by the C compiler here + fence() + b.interest = true + fence() while b.left != b.entered: await(b.cv) destroyCondVar(b.cv) @@ -207,9 +212,9 @@ proc `^`*[T](prom: Promise[T]): T = result = prom.blob proc awaitAny*(promises: openArray[RawPromise]): int = - # awaits any of the given promises. Returns the index of one promise for which - ## a value arrived. A promise only supports one call to 'awaitAny' at the - ## same time. That means if you await([a,b]) and await([b,c]) the second + ## awaits any of the given promises. Returns the index of one promise for + ## which a value arrived. A promise only supports one call to 'awaitAny' at + ## the same time. That means if you await([a,b]) and await([b,c]) the second ## call will only await 'c'. If there is no promise left to be able to wait ## on, -1 is returned. ## **Note**: This results in non-deterministic behaviour and so should be @@ -294,14 +299,16 @@ proc preferSpawn*(): bool = proc spawn*(call: expr): expr {.magic: "Spawn".} ## always spawns a new task, so that the 'call' is never executed on ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' - ## is gcsafe and has 'void' as the return type. + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``Promise[T]``. template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to ## use 'spawn' in order to not block the producer for an unknown ## amount of time. 'call' has to be proc call 'p(...)' where 'p' - ## is gcsafe and has 'void' as the return type. + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``Promise[T]``. (if preferSpawn(): spawn call else: call) proc parallel*(body: stmt) {.magic: "Parallel".} diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim index 6e2bd9a975..43b3f0438f 100644 --- a/lib/system/atomics.nim +++ b/lib/system/atomics.nim @@ -10,7 +10,9 @@ ## Atomic operations for Nimrod. {.push stackTrace:off.} -when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: +const someGcc = defined(gcc) or defined(llvm_gcc) or defined(clang) + +when someGcc and hasThreadSupport: type AtomMemModel* = enum ATOMIC_RELAXED, ## No barriers or synchronization. @@ -163,33 +165,6 @@ else: inc(p[], val) result = p[] -# atomic compare and swap (CAS) funcitons to implement lock-free algorithms - -#if defined(windows) and not defined(gcc) and hasThreadSupport: -# proc InterlockedCompareExchangePointer(mem: ptr pointer, -# newValue: pointer, comparand: pointer) : pointer {.nodecl, -# importc: "InterlockedCompareExchangePointer", header:"windows.h".} - -# proc compareAndSwap*[T](mem: ptr T, -# expected: T, newValue: T): bool {.inline.}= -# ## Returns true if successfully set value at mem to newValue when value -# ## at mem == expected -# return InterlockedCompareExchangePointer(addr(mem), -# addr(newValue), addr(expected))[] == expected - -#elif not hasThreadSupport: -# proc compareAndSwap*[T](mem: ptr T, -# expected: T, newValue: T): bool {.inline.} = -# ## Returns true if successfully set value at mem to newValue when value -# ## at mem == expected -# var oldval = mem[] -# if oldval == expected: -# mem[] = newValue -# return true -# return false - - -# Some convenient functions proc atomicInc*(memLoc: var int, x: int = 1): int = when defined(gcc) and hasThreadSupport: result = atomic_add_fetch(memLoc.addr, x, ATOMIC_RELAXED) @@ -207,7 +182,7 @@ proc atomicDec*(memLoc: var int, x: int = 1): int = dec(memLoc, x) result = memLoc -when defined(windows) and not defined(gcc): +when defined(windows) and not someGcc: proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 {.importc: "InterlockedCompareExchange", header: "", cdecl.} @@ -221,7 +196,7 @@ else: # XXX is this valid for 'int'? -when (defined(x86) or defined(amd64)) and defined(gcc): +when (defined(x86) or defined(amd64)) and (defined(gcc) or defined(llvm_gcc)): proc cpuRelax {.inline.} = {.emit: """asm volatile("pause" ::: "memory");""".} elif (defined(x86) or defined(amd64)) and defined(vcc): diff --git a/todo.txt b/todo.txt index 8a351e8a78..7d4eac1ada 100644 --- a/todo.txt +++ b/todo.txt @@ -8,8 +8,11 @@ Concurrency - implement 'deepCopy' builtin - implement 'foo[1..4] = spawn(f[4..7])' - the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()' +- support for exception propagation - Minor: The copying of the 'ref Promise' into the thead local storage only happens to work due to the write barrier's implementation +- 'gcsafe' inferrence needs to be fixed +- implement lock levels --> first without the more complex race avoidance Misc From 59c18eb7438dfa555bad1e94a9051a61edccb2fc Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 6 Jun 2014 07:56:47 +0200 Subject: [PATCH 4/5] big rename: Promise -> FlowVar --- compiler/lowerings.nim | 92 ++++++++++----------- compiler/semexprs.nim | 14 ++-- lib/pure/concurrency/threadpool.nim | 122 ++++++++++++++-------------- 3 files changed, 114 insertions(+), 114 deletions(-) diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index 327a18df5b..e2afa43628 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -140,26 +140,26 @@ proc callProc(a: PNode): PNode = # we have 4 cases to consider: # - a void proc --> nothing to do -# - a proc returning GC'ed memory --> requires a promise +# - a proc returning GC'ed memory --> requires a flowVar # - a proc returning non GC'ed memory --> pass as hidden 'var' parameter -# - not in a parallel environment --> requires a promise for memory safety +# - not in a parallel environment --> requires a flowVar for memory safety type TSpawnResult = enum - srVoid, srPromise, srByVar - TPromiseKind = enum - promInvalid # invalid type T for 'Promise[T]' - promGC # Promise of a GC'ed type - promBlob # Promise of a blob type + srVoid, srFlowVar, srByVar + TFlowVarKind = enum + fvInvalid # invalid type T for 'FlowVar[T]' + fvGC # FlowVar of a GC'ed type + fvBlob # FlowVar of a blob type proc spawnResult(t: PType; inParallel: bool): TSpawnResult = if t.isEmptyType: srVoid elif inParallel and not containsGarbageCollectedRef(t): srByVar - else: srPromise + else: srFlowVar -proc promiseKind(t: PType): TPromiseKind = - if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: promGC - elif containsGarbageCollectedRef(t): promInvalid - else: promBlob +proc flowVarKind(t: PType): TFlowVarKind = + if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC + elif containsGarbageCollectedRef(t): fvInvalid + else: fvBlob proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym = result = newSym(skTemp, getIdent(genPrefix), owner, varSection.info) @@ -180,13 +180,13 @@ proc f_wrapper(thread, args) = var a = args.a # thread transfer; deepCopy or shallowCopy or no copy # depending on whether we're in a 'parallel' statement var b = args.b - var prom = args.prom + var fv = args.fv - prom.owner = thread # optional + fv.owner = thread # optional nimArgsPassingDone() # signal parent that the work is done # - args.prom.blob = f(a, b, ...) - nimPromiseSignal(args.prom) + args.fv.blob = f(a, b, ...) + nimFlowVarSignal(args.fv) # - or - f(a, b, ...) @@ -198,12 +198,12 @@ stmtList: scratchObj.b = b nimSpawn(f_wrapper, addr scratchObj) - scratchObj.prom # optional + scratchObj.fv # optional """ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; - varSection, call, barrier, prom: PNode; + varSection, call, barrier, fv: PNode; spawnKind: TSpawnResult): PSym = var body = newNodeI(nkStmtList, f.info) var threadLocalBarrier: PSym @@ -215,32 +215,32 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; body.add callCodeGenProc("barrierEnter", threadLocalBarrier.newSymNode) var threadLocalProm: PSym if spawnKind == srByVar: - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) - elif prom != nil: - internalAssert prom.typ.kind == tyGenericInst - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) + threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv) + elif fv != nil: + internalAssert fv.typ.kind == tyGenericInst + threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv) body.add varSection - if prom != nil and spawnKind != srByVar: + if fv != nil and spawnKind != srByVar: # generate: - # prom.owner = threadParam + # fv.owner = threadParam body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, - "owner", prom.info), threadParam.newSymNode) + "owner", fv.info), threadParam.newSymNode) body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) if spawnKind == srByVar: body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call) - elif prom != nil: - let fk = prom.typ.sons[1].promiseKind - if fk == promInvalid: - localError(f.info, "cannot create a promise of type: " & - typeToString(prom.typ.sons[1])) + elif fv != nil: + let fk = fv.typ.sons[1].flowVarKind + if fk == fvInvalid: + localError(f.info, "cannot create a flowVar of type: " & + typeToString(fv.typ.sons[1])) body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, - if fk == promGC: "data" else: "blob", prom.info), call) + if fk == fvGC: "data" else: "blob", fv.info), call) if barrier == nil: - # by now 'prom' is shared and thus might have beeen overwritten! we need + # by now 'fv' is shared and thus might have beeen overwritten! we need # to use the thread-local view instead: - body.add callCodeGenProc("nimPromiseSignal", threadLocalProm.newSymNode) + body.add callCodeGenProc("nimFlowVarSignal", threadLocalProm.newSymNode) else: body.add call if barrier != nil: @@ -409,7 +409,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; of srVoid: internalAssert dest == nil result = newNodeI(nkStmtList, n.info) - of srPromise: + of srFlowVar: internalAssert dest == nil result = newNodeIT(nkStmtListExpr, n.info, retType) of srByVar: @@ -478,29 +478,29 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier) barrierAsExpr = indirectAccess(castExpr, field, n.info) - var promField, promAsExpr: PNode = nil - if spawnKind == srPromise: - var field = newSym(skField, getIdent"prom", owner, n.info) + var fvField, fvAsExpr: PNode = nil + if spawnKind == srFlowVar: + var field = newSym(skField, getIdent"fv", owner, n.info) field.typ = retType objType.addField(field) - promField = newDotExpr(scratchObj, field) - promAsExpr = indirectAccess(castExpr, field, n.info) - # create promise: - result.add newFastAsgnStmt(promField, callProc(spawnExpr[2])) + fvField = newDotExpr(scratchObj, field) + fvAsExpr = indirectAccess(castExpr, field, n.info) + # create flowVar: + result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2])) if barrier == nil: - result.add callCodeGenProc("nimPromiseCreateCondVar", promField) + result.add callCodeGenProc("nimFlowVarCreateCondVar", fvField) elif spawnKind == srByVar: - var field = newSym(skField, getIdent"prom", owner, n.info) + var field = newSym(skField, getIdent"fv", owner, n.info) field.typ = newType(tyPtr, objType.owner) field.typ.rawAddSon(retType) objType.addField(field) - promAsExpr = indirectAccess(castExpr, field, n.info) + fvAsExpr = indirectAccess(castExpr, field, n.info) result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest)) let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, call, - barrierAsExpr, promAsExpr, spawnKind) + barrierAsExpr, fvAsExpr, spawnKind) result.add callCodeGenProc("nimSpawn", wrapper.newSymNode, genAddrOf(scratchObj.newSymNode)) - if spawnKind == srPromise: result.add promField + if spawnKind == srFlowVar: result.add fvField diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index 9e3785185b..5603f0e975 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1579,17 +1579,17 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode = else: result = semDirectOp(c, n, flags) -proc createPromise(c: PContext; t: PType; info: TLineInfo): PType = +proc createFlowVar(c: PContext; t: PType; info: TLineInfo): PType = result = newType(tyGenericInvokation, c.module) - addSonSkipIntLit(result, magicsys.getCompilerProc("Promise").typ) + addSonSkipIntLit(result, magicsys.getCompilerProc("FlowVar").typ) addSonSkipIntLit(result, t) result = instGenericContainer(c, info, result, allowMetaTypes = false) -proc instantiateCreatePromiseCall(c: PContext; t: PType; +proc instantiateCreateFlowVarCall(c: PContext; t: PType; info: TLineInfo): PSym = - let sym = magicsys.getCompilerProc("nimCreatePromise") + let sym = magicsys.getCompilerProc("nimCreateFlowVar") if sym == nil: - localError(info, errSystemNeeds, "nimCreatePromise") + localError(info, errSystemNeeds, "nimCreateFlowVar") var bindings: TIdTable initIdTable(bindings) bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t) @@ -1635,8 +1635,8 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = if c.inParallelStmt > 0: result.typ = result[1].typ else: - result.typ = createPromise(c, result[1].typ, n.info) - result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode + result.typ = createFlowVar(c, result[1].typ, n.info) + result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode else: result = semDirectOp(c, n, flags) proc semWhen(c: PContext, n: PNode, semCheck = true): PNode = diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index c4ed42c058..c34b91e30e 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -90,8 +90,8 @@ type cv: CondVar idx: int - RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj = object of TObject + RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]' + RawFlowVarObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support @@ -100,10 +100,10 @@ type data: pointer # we incRef and unref it to keep it alive owner: pointer # ptr Worker - PromiseObj[T] = object of RawPromiseObj + FlowVarObj[T] = object of RawFlowVarObj blob: T - Promise*{.compilerProc.}[T] = ref PromiseObj[T] + FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable ToFreeQueue = object len: int @@ -123,28 +123,28 @@ type shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue -proc await*(prom: RawPromise) = - ## waits until the value for the promise arrives. Usually it is not necessary +proc await*(fv: RawFlowVar) = + ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. - if prom.usesCondVar: - prom.usesCondVar = false - await(prom.cv) - destroyCondVar(prom.cv) + if fv.usesCondVar: + fv.usesCondVar = false + await(fv.cv) + destroyCondVar(fv.cv) -proc finished(prom: RawPromise) = - doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - # we have to protect against the rare cases where the owner of the promise - # simply disregards the promise and yet the "promiser" has not yet written +proc finished(fv: RawFlowVar) = + doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" + # we have to protect against the rare cases where the owner of the flowVar + # simply disregards the flowVar and yet the "flowVarr" has not yet written # anything to it: - await(prom) - if prom.data.isNil: return - let owner = cast[ptr Worker](prom.owner) + await(fv) + if fv.data.isNil: return + let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false while true: acquire(q.lock) if q.len < q.data.len: - q.data[q.len] = prom.data + q.data[q.len] = fv.data inc q.len release(q.lock) break @@ -153,11 +153,11 @@ proc finished(prom: RawPromise) = release(q.lock) wait(q.empty, q.lock) waited = true - prom.data = nil + fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) -proc cleanPromises(w: ptr Worker) = +proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) for i in 0 .. Date: Fri, 6 Jun 2014 08:48:38 +0200 Subject: [PATCH 5/5] begin of spawn documentation --- doc/spawn.txt | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 doc/spawn.txt diff --git a/doc/spawn.txt b/doc/spawn.txt new file mode 100644 index 0000000000..19560ebf58 --- /dev/null +++ b/doc/spawn.txt @@ -0,0 +1,57 @@ +========================================================== + Parallel & Spawn +========================================================== + +Nimrod has two flavors of parallelism: +1) `Structured`:idx parallelism via the ``parallel`` statement. +2) `Unstructured`:idx: parallelism via the standalone ``spawn`` statement. + +Somewhat confusingly, ``spawn`` is also used in the ``parallel`` statement +with slightly different semantics. ``spawn`` always takes a call expression of +the form ``f(a, ...)``. Let ``T`` be ``f``'s return type. If ``T`` is ``void`` +then ``spawn``'s return type is also ``void``. Within a ``parallel`` section +``spawn``'s return type is ``T``, otherwise it is ``FlowVar[T]``. + +The compiler can ensure the location in ``location = spawn f(...)`` is not +read prematurely within a ``parallel`` section and so there is no need for +the overhead of an indirection via ``FlowVar[T]`` to ensure correctness. + + +Parallel statement +================== + +The parallel statement is the preferred mechanism to introduce parallelism +in a Nimrod program. A subset of the Nimrod language is valid within a +``parallel`` section. This subset is checked to be free of data races at +compile time. A sophisticated `disjoint checker`:idx: ensures that no data +races are possible even though shared memory is extensively supported! + +The subset is in fact the full language with the following +restrictions / changes: + +* ``spawn`` within a ``parallel`` section has special semantics. +* Every location of the form ``a[i]`` and ``a[i..j]`` and ``dest`` where + ``dest`` is part of the pattern ``dest = spawn f(...)`` has to be + provable disjoint. This is called the *disjoint check*. +* Every other complex location ``loc`` that is used in a spawned + proc (``spawn f(loc)``) has to immutable for the duration of + the ``parallel``. This is called the *immutability check*. Currently it + is not specified what exactly "complex location" means. We need to make that + an optimization! +* Every array access has to be provable within bounds. +* Slices are optimized so that no copy is performed. This optimization is not + yet performed for ordinary slices outside of a ``parallel`` section. + + +Spawn statement +=============== + +A standalone ``spawn`` statement is a simple construct. It executes +the passed expression on the thread pool and returns a `data flow variable`:idx: +``FlowVar[T]`` that can be read from. The reading with the ``^`` operator is +**blocking**. However, one can use ``awaitAny`` to wait on multiple flow variables +at the same time. + +Like the ``parallel`` statement data flow variables ensure that no data races +are possible. +