big rename: Promise -> FlowVar

This commit is contained in:
Araq
2014-06-06 07:56:47 +02:00
parent b7cbb08f99
commit 59c18eb743
3 changed files with 114 additions and 114 deletions

View File

@@ -140,26 +140,26 @@ proc callProc(a: PNode): PNode =
# we have 4 cases to consider:
# - a void proc --> nothing to do
# - a proc returning GC'ed memory --> requires a promise
# - a proc returning GC'ed memory --> requires a flowVar
# - a proc returning non GC'ed memory --> pass as hidden 'var' parameter
# - not in a parallel environment --> requires a promise for memory safety
# - not in a parallel environment --> requires a flowVar for memory safety
type
TSpawnResult = enum
srVoid, srPromise, srByVar
TPromiseKind = enum
promInvalid # invalid type T for 'Promise[T]'
promGC # Promise of a GC'ed type
promBlob # Promise of a blob type
srVoid, srFlowVar, srByVar
TFlowVarKind = enum
fvInvalid # invalid type T for 'FlowVar[T]'
fvGC # FlowVar of a GC'ed type
fvBlob # FlowVar of a blob type
proc spawnResult(t: PType; inParallel: bool): TSpawnResult =
if t.isEmptyType: srVoid
elif inParallel and not containsGarbageCollectedRef(t): srByVar
else: srPromise
else: srFlowVar
proc promiseKind(t: PType): TPromiseKind =
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: promGC
elif containsGarbageCollectedRef(t): promInvalid
else: promBlob
proc flowVarKind(t: PType): TFlowVarKind =
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
elif containsGarbageCollectedRef(t): fvInvalid
else: fvBlob
proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym =
result = newSym(skTemp, getIdent(genPrefix), owner, varSection.info)
@@ -180,13 +180,13 @@ proc f_wrapper(thread, args) =
var a = args.a # thread transfer; deepCopy or shallowCopy or no copy
# depending on whether we're in a 'parallel' statement
var b = args.b
var prom = args.prom
var fv = args.fv
prom.owner = thread # optional
fv.owner = thread # optional
nimArgsPassingDone() # signal parent that the work is done
#
args.prom.blob = f(a, b, ...)
nimPromiseSignal(args.prom)
args.fv.blob = f(a, b, ...)
nimFlowVarSignal(args.fv)
# - or -
f(a, b, ...)
@@ -198,12 +198,12 @@ stmtList:
scratchObj.b = b
nimSpawn(f_wrapper, addr scratchObj)
scratchObj.prom # optional
scratchObj.fv # optional
"""
proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
varSection, call, barrier, prom: PNode;
varSection, call, barrier, fv: PNode;
spawnKind: TSpawnResult): PSym =
var body = newNodeI(nkStmtList, f.info)
var threadLocalBarrier: PSym
@@ -215,32 +215,32 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
body.add callCodeGenProc("barrierEnter", threadLocalBarrier.newSymNode)
var threadLocalProm: PSym
if spawnKind == srByVar:
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom)
elif prom != nil:
internalAssert prom.typ.kind == tyGenericInst
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom)
threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv)
elif fv != nil:
internalAssert fv.typ.kind == tyGenericInst
threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv)
body.add varSection
if prom != nil and spawnKind != srByVar:
if fv != nil and spawnKind != srByVar:
# generate:
# prom.owner = threadParam
# fv.owner = threadParam
body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
"owner", prom.info), threadParam.newSymNode)
"owner", fv.info), threadParam.newSymNode)
body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode)
if spawnKind == srByVar:
body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
elif prom != nil:
let fk = prom.typ.sons[1].promiseKind
if fk == promInvalid:
localError(f.info, "cannot create a promise of type: " &
typeToString(prom.typ.sons[1]))
elif fv != nil:
let fk = fv.typ.sons[1].flowVarKind
if fk == fvInvalid:
localError(f.info, "cannot create a flowVar of type: " &
typeToString(fv.typ.sons[1]))
body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
if fk == promGC: "data" else: "blob", prom.info), call)
if fk == fvGC: "data" else: "blob", fv.info), call)
if barrier == nil:
# by now 'prom' is shared and thus might have beeen overwritten! we need
# by now 'fv' is shared and thus might have beeen overwritten! we need
# to use the thread-local view instead:
body.add callCodeGenProc("nimPromiseSignal", threadLocalProm.newSymNode)
body.add callCodeGenProc("nimFlowVarSignal", threadLocalProm.newSymNode)
else:
body.add call
if barrier != nil:
@@ -409,7 +409,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
of srVoid:
internalAssert dest == nil
result = newNodeI(nkStmtList, n.info)
of srPromise:
of srFlowVar:
internalAssert dest == nil
result = newNodeIT(nkStmtListExpr, n.info, retType)
of srByVar:
@@ -478,29 +478,29 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier)
barrierAsExpr = indirectAccess(castExpr, field, n.info)
var promField, promAsExpr: PNode = nil
if spawnKind == srPromise:
var field = newSym(skField, getIdent"prom", owner, n.info)
var fvField, fvAsExpr: PNode = nil
if spawnKind == srFlowVar:
var field = newSym(skField, getIdent"fv", owner, n.info)
field.typ = retType
objType.addField(field)
promField = newDotExpr(scratchObj, field)
promAsExpr = indirectAccess(castExpr, field, n.info)
# create promise:
result.add newFastAsgnStmt(promField, callProc(spawnExpr[2]))
fvField = newDotExpr(scratchObj, field)
fvAsExpr = indirectAccess(castExpr, field, n.info)
# create flowVar:
result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2]))
if barrier == nil:
result.add callCodeGenProc("nimPromiseCreateCondVar", promField)
result.add callCodeGenProc("nimFlowVarCreateCondVar", fvField)
elif spawnKind == srByVar:
var field = newSym(skField, getIdent"prom", owner, n.info)
var field = newSym(skField, getIdent"fv", owner, n.info)
field.typ = newType(tyPtr, objType.owner)
field.typ.rawAddSon(retType)
objType.addField(field)
promAsExpr = indirectAccess(castExpr, field, n.info)
fvAsExpr = indirectAccess(castExpr, field, n.info)
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))
let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, call,
barrierAsExpr, promAsExpr, spawnKind)
barrierAsExpr, fvAsExpr, spawnKind)
result.add callCodeGenProc("nimSpawn", wrapper.newSymNode,
genAddrOf(scratchObj.newSymNode))
if spawnKind == srPromise: result.add promField
if spawnKind == srFlowVar: result.add fvField

View File

@@ -1579,17 +1579,17 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode =
else:
result = semDirectOp(c, n, flags)
proc createPromise(c: PContext; t: PType; info: TLineInfo): PType =
proc createFlowVar(c: PContext; t: PType; info: TLineInfo): PType =
result = newType(tyGenericInvokation, c.module)
addSonSkipIntLit(result, magicsys.getCompilerProc("Promise").typ)
addSonSkipIntLit(result, magicsys.getCompilerProc("FlowVar").typ)
addSonSkipIntLit(result, t)
result = instGenericContainer(c, info, result, allowMetaTypes = false)
proc instantiateCreatePromiseCall(c: PContext; t: PType;
proc instantiateCreateFlowVarCall(c: PContext; t: PType;
info: TLineInfo): PSym =
let sym = magicsys.getCompilerProc("nimCreatePromise")
let sym = magicsys.getCompilerProc("nimCreateFlowVar")
if sym == nil:
localError(info, errSystemNeeds, "nimCreatePromise")
localError(info, errSystemNeeds, "nimCreateFlowVar")
var bindings: TIdTable
initIdTable(bindings)
bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t)
@@ -1635,8 +1635,8 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
if c.inParallelStmt > 0:
result.typ = result[1].typ
else:
result.typ = createPromise(c, result[1].typ, n.info)
result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode
result.typ = createFlowVar(c, result[1].typ, n.info)
result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode
else: result = semDirectOp(c, n, flags)
proc semWhen(c: PContext, n: PNode, semCheck = true): PNode =

View File

@@ -90,8 +90,8 @@ type
cv: CondVar
idx: int
RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]'
RawPromiseObj = object of TObject
RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]'
RawFlowVarObj = object of TObject
ready, usesCondVar: bool
cv: CondVar #\
# for 'awaitAny' support
@@ -100,10 +100,10 @@ type
data: pointer # we incRef and unref it to keep it alive
owner: pointer # ptr Worker
PromiseObj[T] = object of RawPromiseObj
FlowVarObj[T] = object of RawFlowVarObj
blob: T
Promise*{.compilerProc.}[T] = ref PromiseObj[T]
FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable
ToFreeQueue = object
len: int
@@ -123,28 +123,28 @@ type
shutdown: bool # the pool requests to shut down this worker thread
q: ToFreeQueue
proc await*(prom: RawPromise) =
## waits until the value for the promise arrives. Usually it is not necessary
proc await*(fv: RawFlowVar) =
## waits until the value for the flowVar arrives. Usually it is not necessary
## to call this explicitly.
if prom.usesCondVar:
prom.usesCondVar = false
await(prom.cv)
destroyCondVar(prom.cv)
if fv.usesCondVar:
fv.usesCondVar = false
await(fv.cv)
destroyCondVar(fv.cv)
proc finished(prom: RawPromise) =
doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'"
# we have to protect against the rare cases where the owner of the promise
# simply disregards the promise and yet the "promiser" has not yet written
proc finished(fv: RawFlowVar) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
# we have to protect against the rare cases where the owner of the flowVar
# simply disregards the flowVar and yet the "flowVarr" has not yet written
# anything to it:
await(prom)
if prom.data.isNil: return
let owner = cast[ptr Worker](prom.owner)
await(fv)
if fv.data.isNil: return
let owner = cast[ptr Worker](fv.owner)
let q = addr(owner.q)
var waited = false
while true:
acquire(q.lock)
if q.len < q.data.len:
q.data[q.len] = prom.data
q.data[q.len] = fv.data
inc q.len
release(q.lock)
break
@@ -153,11 +153,11 @@ proc finished(prom: RawPromise) =
release(q.lock)
wait(q.empty, q.lock)
waited = true
prom.data = nil
fv.data = nil
# wakeup other potentially waiting threads:
if waited: signal(q.empty)
proc cleanPromises(w: ptr Worker) =
proc cleanFlowVars(w: ptr Worker) =
let q = addr(w.q)
acquire(q.lock)
for i in 0 .. <q.len:
@@ -166,72 +166,72 @@ proc cleanPromises(w: ptr Worker) =
release(q.lock)
signal(q.empty)
proc promFinalizer[T](prom: Promise[T]) = finished(prom)
proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
proc nimCreatePromise[T](): Promise[T] {.compilerProc.} =
new(result, promFinalizer)
proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
new(result, fvFinalizer)
proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} =
prom.cv = createCondVar()
prom.usesCondVar = true
proc nimFlowVarCreateCondVar(fv: RawFlowVar) {.compilerProc.} =
fv.cv = createCondVar()
fv.usesCondVar = true
proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} =
if prom.ai != nil:
acquire(prom.ai.cv.L)
prom.ai.idx = prom.idx
inc prom.ai.cv.counter
release(prom.ai.cv.L)
signal(prom.ai.cv.c)
if prom.usesCondVar: signal(prom.cv)
proc nimFlowVarSignal(fv: RawFlowVar) {.compilerProc.} =
if fv.ai != nil:
acquire(fv.ai.cv.L)
fv.ai.idx = fv.idx
inc fv.ai.cv.counter
release(fv.ai.cv.L)
signal(fv.ai.cv.c)
if fv.usesCondVar: signal(fv.cv)
proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
## blocks until the ``prom`` is available and then passes its value
proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
## blocks until the ``fv`` is available and then passes its value
## to ``action``. Note that due to Nimrod's parameter passing semantics this
## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
## sometimes be more efficient than ``^``.
await(prom)
await(fv)
when T is string or T is seq:
action(cast[T](prom.data))
action(cast[T](fv.data))
elif T is ref:
{.error: "'awaitAndThen' not available for Promise[ref]".}
{.error: "'awaitAndThen' not available for FlowVar[ref]".}
else:
action(prom.blob)
finished(prom)
action(fv.blob)
finished(fv)
proc `^`*[T](prom: Promise[ref T]): foreign ptr T =
proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T =
## blocks until the value is available and then returns this value.
await(prom)
result = cast[foreign ptr T](prom.data)
await(fv)
result = cast[foreign ptr T](fv.data)
proc `^`*[T](prom: Promise[T]): T =
proc `^`*[T](fv: FlowVar[T]): T =
## blocks until the value is available and then returns this value.
await(prom)
await(fv)
when T is string or T is seq:
result = cast[T](prom.data)
result = cast[T](fv.data)
else:
result = prom.blob
result = fv.blob
proc awaitAny*(promises: openArray[RawPromise]): int =
## awaits any of the given promises. Returns the index of one promise for
## which a value arrived. A promise only supports one call to 'awaitAny' at
proc awaitAny*(flowVars: openArray[RawFlowVar]): int =
## awaits any of the given flowVars. Returns the index of one flowVar for
## which a value arrived. A flowVar only supports one call to 'awaitAny' at
## the same time. That means if you await([a,b]) and await([b,c]) the second
## call will only await 'c'. If there is no promise left to be able to wait
## call will only await 'c'. If there is no flowVar left to be able to wait
## on, -1 is returned.
## **Note**: This results in non-deterministic behaviour and so should be
## avoided.
var ai: AwaitInfo
ai.cv = createCondVar()
var conflicts = 0
for i in 0 .. promises.high:
if cas(addr promises[i].ai, nil, addr ai):
promises[i].idx = i
for i in 0 .. flowVars.high:
if cas(addr flowVars[i].ai, nil, addr ai):
flowVars[i].idx = i
else:
inc conflicts
if conflicts < promises.len:
if conflicts < flowVars.len:
await(ai.cv)
result = ai.idx
for i in 0 .. promises.high:
discard cas(addr promises[i].ai, addr ai, nil)
for i in 0 .. flowVars.high:
discard cas(addr flowVars[i].ai, addr ai, nil)
else:
result = -1
destroyCondVar(ai.cv)
@@ -259,7 +259,7 @@ proc slave(w: ptr Worker) {.thread.} =
await(w.taskArrived)
assert(not w.ready)
w.f(w, w.data)
if w.q.len != 0: w.cleanPromises
if w.q.len != 0: w.cleanFlowVars
if w.shutdown:
w.shutdown = false
atomicDec currentPoolSize
@@ -300,7 +300,7 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
## always spawns a new task, so that the 'call' is never executed on
## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
## is gcsafe and has a return type that is either 'void' or compatible
## with ``Promise[T]``.
## with ``FlowVar[T]``.
template spawnX*(call: expr): expr =
## spawns a new task if a CPU core is ready, otherwise executes the
@@ -308,7 +308,7 @@ template spawnX*(call: expr): expr =
## use 'spawn' in order to not block the producer for an unknown
## amount of time. 'call' has to be proc call 'p(...)' where 'p'
## is gcsafe and has a return type that is either 'void' or compatible
## with ``Promise[T]``.
## with ``FlowVar[T]``.
(if preferSpawn(): spawn call else: call)
proc parallel*(body: stmt) {.magic: "Parallel".}