mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-20 14:25:23 +00:00
Promises are now refs
This commit is contained in:
@@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) =
|
||||
of mSlurp..mQuoteAst:
|
||||
localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s)
|
||||
of mSpawn:
|
||||
let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil)
|
||||
let n = lowerings.wrapProcForSpawn(p.module.module, e, e.typ, nil, nil)
|
||||
expr(p, n, d)
|
||||
of mParallel:
|
||||
let n = semparallel.liftParallel(p.module.module, e)
|
||||
|
||||
@@ -86,7 +86,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
|
||||
# returns a[].b as a node
|
||||
var deref = newNodeI(nkHiddenDeref, info)
|
||||
deref.typ = a.typ.skipTypes(abstractInst).sons[0]
|
||||
var t = deref.typ
|
||||
var t = deref.typ.skipTypes(abstractInst)
|
||||
var field: PSym
|
||||
while true:
|
||||
assert t.kind == tyObject
|
||||
@@ -94,6 +94,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
|
||||
if field != nil: break
|
||||
t = t.sons[0]
|
||||
if t == nil: break
|
||||
t = t.skipTypes(abstractInst)
|
||||
assert field != nil, b
|
||||
addSon(deref, a)
|
||||
result = newNodeI(nkDotExpr, info)
|
||||
@@ -132,6 +133,11 @@ proc callCodegenProc*(name: string, arg1: PNode;
|
||||
if arg3 != nil: result.add arg3
|
||||
result.typ = sym.typ.sons[0]
|
||||
|
||||
proc callProc(a: PNode): PNode =
|
||||
result = newNodeI(nkCall, a.info)
|
||||
result.add a
|
||||
result.typ = a.typ.sons[0]
|
||||
|
||||
# we have 4 cases to consider:
|
||||
# - a void proc --> nothing to do
|
||||
# - a proc returning GC'ed memory --> requires a promise
|
||||
@@ -169,14 +175,14 @@ proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym =
|
||||
discard """
|
||||
We generate roughly this:
|
||||
|
||||
proc f_wrapper(args) =
|
||||
proc f_wrapper(thread, args) =
|
||||
barrierEnter(args.barrier) # for parallel statement
|
||||
var a = args.a # thread transfer; deepCopy or shallowCopy or no copy
|
||||
# depending on whether we're in a 'parallel' statement
|
||||
var b = args.b
|
||||
var prom = args.prom
|
||||
|
||||
args.prom = nimCreatePromise(thread, sizeof(T)) # optional
|
||||
nimPromiseCreateCondVar(args.prom) # optional
|
||||
prom.owner = thread # optional
|
||||
nimArgsPassingDone() # signal parent that the work is done
|
||||
#
|
||||
args.prom.blob = f(a, b, ...)
|
||||
@@ -196,17 +202,6 @@ stmtList:
|
||||
|
||||
"""
|
||||
|
||||
proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode =
|
||||
let size = newNodeIT(nkCall, prom.info, getSysType(tyInt))
|
||||
size.add newSymNode(createMagic("sizeof", mSizeOf))
|
||||
assert prom.typ.kind == tyGenericInst
|
||||
size.add newNodeIT(nkType, prom.info, prom.typ.sons[1])
|
||||
|
||||
let castExpr = newNodeIT(nkCast, prom.info, prom.typ)
|
||||
castExpr.add emptyNode
|
||||
castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size)
|
||||
result = castExpr
|
||||
|
||||
proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
|
||||
varSection, call, barrier, prom: PNode;
|
||||
spawnKind: TSpawnResult): PSym =
|
||||
@@ -223,14 +218,14 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
|
||||
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom)
|
||||
elif prom != nil:
|
||||
internalAssert prom.typ.kind == tyGenericInst
|
||||
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ,
|
||||
createNimCreatePromiseCall(prom, threadParam.newSymNode))
|
||||
threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom)
|
||||
|
||||
body.add varSection
|
||||
if prom != nil and spawnKind != srByVar:
|
||||
body.add newFastAsgnStmt(prom, threadLocalProm.newSymNode)
|
||||
if barrier == nil:
|
||||
body.add callCodeGenProc("nimPromiseCreateCondVar", prom)
|
||||
# generate:
|
||||
# prom.owner = threadParam
|
||||
body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
|
||||
"owner", prom.info), threadParam.newSymNode)
|
||||
|
||||
body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode)
|
||||
if spawnKind == srByVar:
|
||||
@@ -404,10 +399,11 @@ proc setupArgsForParallelism(n: PNode; objType: PType; scratchObj: PSym;
|
||||
indirectAccess(castExpr, field, n.info))
|
||||
call.add(threadLocal.newSymNode)
|
||||
|
||||
proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType;
|
||||
proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
|
||||
barrier, dest: PNode = nil): PNode =
|
||||
# if 'barrier' != nil, then it is in a 'parallel' section and we
|
||||
# generate quite different code
|
||||
let n = spawnExpr[1]
|
||||
let spawnKind = spawnResult(retType, barrier!=nil)
|
||||
case spawnKind
|
||||
of srVoid:
|
||||
@@ -419,7 +415,7 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType;
|
||||
of srByVar:
|
||||
if dest == nil: localError(n.info, "'spawn' must not be discarded")
|
||||
result = newNodeI(nkStmtList, n.info)
|
||||
|
||||
|
||||
if n.kind notin nkCallKinds:
|
||||
localError(n.info, "'spawn' takes a call expression")
|
||||
return
|
||||
@@ -489,6 +485,11 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType;
|
||||
objType.addField(field)
|
||||
promField = newDotExpr(scratchObj, field)
|
||||
promAsExpr = indirectAccess(castExpr, field, n.info)
|
||||
# create promise:
|
||||
result.add newFastAsgnStmt(promField, callProc(spawnExpr[2]))
|
||||
if barrier == nil:
|
||||
result.add callCodeGenProc("nimPromiseCreateCondVar", promField)
|
||||
|
||||
elif spawnKind == srByVar:
|
||||
var field = newSym(skField, getIdent"prom", owner, n.info)
|
||||
field.typ = newType(tyPtr, objType.owner)
|
||||
|
||||
@@ -644,12 +644,13 @@ proc singlePragma(c: PContext, sym: PSym, n: PNode, i: int,
|
||||
incl(sym.flags, sfNoReturn)
|
||||
of wDynlib:
|
||||
processDynLib(c, it, sym)
|
||||
of wCompilerproc:
|
||||
of wCompilerproc:
|
||||
noVal(it) # compilerproc may not get a string!
|
||||
makeExternExport(sym, "$1", it.info)
|
||||
incl(sym.flags, sfCompilerProc)
|
||||
incl(sym.flags, sfUsed) # suppress all those stupid warnings
|
||||
registerCompilerProc(sym)
|
||||
if sfFromGeneric notin sym.flags:
|
||||
makeExternExport(sym, "$1", it.info)
|
||||
incl(sym.flags, sfCompilerProc)
|
||||
incl(sym.flags, sfUsed) # suppress all those stupid warnings
|
||||
registerCompilerProc(sym)
|
||||
of wProcVar:
|
||||
noVal(it)
|
||||
incl(sym.flags, sfProcvar)
|
||||
|
||||
@@ -1585,6 +1585,16 @@ proc createPromise(c: PContext; t: PType; info: TLineInfo): PType =
|
||||
addSonSkipIntLit(result, t)
|
||||
result = instGenericContainer(c, info, result, allowMetaTypes = false)
|
||||
|
||||
proc instantiateCreatePromiseCall(c: PContext; t: PType;
|
||||
info: TLineInfo): PSym =
|
||||
let sym = magicsys.getCompilerProc("nimCreatePromise")
|
||||
if sym == nil:
|
||||
localError(info, errSystemNeeds, "nimCreatePromise")
|
||||
var bindings: TIdTable
|
||||
initIdTable(bindings)
|
||||
bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t)
|
||||
result = c.semGenerateInstance(c, sym, bindings, info)
|
||||
|
||||
proc setMs(n: PNode, s: PSym): PNode =
|
||||
result = n
|
||||
n.sons[0] = newSymNode(s)
|
||||
@@ -1626,6 +1636,7 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
|
||||
result.typ = result[1].typ
|
||||
else:
|
||||
result.typ = createPromise(c, result[1].typ, n.info)
|
||||
result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode
|
||||
else: result = semDirectOp(c, n, flags)
|
||||
|
||||
proc semWhen(c: PContext, n: PNode, semCheck = true): PNode =
|
||||
|
||||
@@ -406,19 +406,19 @@ proc transformSpawn(owner: PSym; n, barrier: PNode): PNode =
|
||||
if result.isNil:
|
||||
result = newNodeI(nkStmtList, n.info)
|
||||
result.add n
|
||||
result.add wrapProcForSpawn(owner, m[1], b.typ, barrier, it[0])
|
||||
result.add wrapProcForSpawn(owner, m, b.typ, barrier, it[0])
|
||||
it.sons[it.len-1] = emptyNode
|
||||
if result.isNil: result = n
|
||||
of nkAsgn, nkFastAsgn:
|
||||
let b = n[1]
|
||||
if getMagic(b) == mSpawn:
|
||||
let m = transformSlices(b)
|
||||
return wrapProcForSpawn(owner, m[1], b.typ, barrier, n[0])
|
||||
return wrapProcForSpawn(owner, m, b.typ, barrier, n[0])
|
||||
result = transformSpawnSons(owner, n, barrier)
|
||||
of nkCallKinds:
|
||||
if getMagic(n) == mSpawn:
|
||||
result = transformSlices(n)
|
||||
return wrapProcForSpawn(owner, result[1], n.typ, barrier, nil)
|
||||
return wrapProcForSpawn(owner, result, n.typ, barrier, nil)
|
||||
result = transformSpawnSons(owner, n, barrier)
|
||||
elif n.safeLen > 0:
|
||||
result = transformSpawnSons(owner, n, barrier)
|
||||
|
||||
@@ -85,25 +85,26 @@ type
|
||||
cv: CondVar
|
||||
idx: int
|
||||
|
||||
RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]'
|
||||
RawPromiseObj {.inheritable.} = object # \
|
||||
# we allocate this with the thread local allocator; this
|
||||
# is possible since we already need to do the GC_unref
|
||||
# on the owning thread
|
||||
RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]'
|
||||
RawPromiseObj = object of TObject
|
||||
ready, usesCondVar: bool
|
||||
cv: CondVar #\
|
||||
# for 'awaitAny' support
|
||||
ai: ptr AwaitInfo
|
||||
idx: int
|
||||
data: PObject # we incRef and unref it to keep it alive
|
||||
owner: ptr Worker
|
||||
next: RawPromise
|
||||
align: float64 # a float for proper alignment
|
||||
data: pointer # we incRef and unref it to keep it alive
|
||||
owner: pointer # ptr Worker
|
||||
|
||||
Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj
|
||||
blob: T ## the underlying value, if available. Note that usually
|
||||
## you should not access this field directly! However it can
|
||||
## sometimes be more efficient than getting the value via ``^``.
|
||||
PromiseObj[T] = object of RawPromiseObj
|
||||
blob: T
|
||||
|
||||
Promise*{.compilerProc.}[T] = ref PromiseObj[T]
|
||||
|
||||
ToFreeQueue = object
|
||||
len: int
|
||||
lock: TLock
|
||||
empty: TCond
|
||||
data: array[512, pointer]
|
||||
|
||||
WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
|
||||
Worker = object
|
||||
@@ -115,37 +116,55 @@ type
|
||||
ready: bool # put it here for correct alignment!
|
||||
initialized: bool # whether it has even been initialized
|
||||
shutdown: bool # the pool requests to shut down this worker thread
|
||||
promiseLock: TLock
|
||||
head: RawPromise
|
||||
q: ToFreeQueue
|
||||
|
||||
proc finished*(prom: RawPromise) =
|
||||
## This MUST be called for every created promise to free its associated
|
||||
## resources. Note that the default reading operation ``^`` is destructive
|
||||
## and calls ``finished``.
|
||||
proc await*(prom: RawPromise) =
|
||||
## waits until the value for the promise arrives. Usually it is not necessary
|
||||
## to call this explicitly.
|
||||
if prom.usesCondVar:
|
||||
prom.usesCondVar = false
|
||||
await(prom.cv)
|
||||
destroyCondVar(prom.cv)
|
||||
|
||||
proc finished(prom: RawPromise) =
|
||||
doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'"
|
||||
assert prom.next == nil
|
||||
let w = prom.owner
|
||||
acquire(w.promiseLock)
|
||||
prom.next = w.head
|
||||
w.head = prom
|
||||
release(w.promiseLock)
|
||||
# we have to protect against the rare cases where the owner of the promise
|
||||
# simply disregards the promise and yet the "promiser" has not yet written
|
||||
# anything to it:
|
||||
await(prom)
|
||||
if prom.data.isNil: return
|
||||
let owner = cast[ptr Worker](prom.owner)
|
||||
let q = addr(owner.q)
|
||||
var waited = false
|
||||
while true:
|
||||
acquire(q.lock)
|
||||
if q.len < q.data.len:
|
||||
q.data[q.len] = prom.data
|
||||
inc q.len
|
||||
release(q.lock)
|
||||
break
|
||||
else:
|
||||
# the queue is exhausted! We block until it has been cleaned:
|
||||
release(q.lock)
|
||||
wait(q.empty, q.lock)
|
||||
waited = true
|
||||
prom.data = nil
|
||||
# wakeup other potentially waiting threads:
|
||||
if waited: signal(q.empty)
|
||||
|
||||
proc cleanPromises(w: ptr Worker) =
|
||||
var it = w.head
|
||||
acquire(w.promiseLock)
|
||||
while it != nil:
|
||||
let nxt = it.next
|
||||
if it.usesCondVar: destroyCondVar(it.cv)
|
||||
if it.data != nil: GC_unref(it.data)
|
||||
dealloc(it)
|
||||
it = nxt
|
||||
w.head = nil
|
||||
release(w.promiseLock)
|
||||
let q = addr(w.q)
|
||||
acquire(q.lock)
|
||||
for i in 0 .. <q.len:
|
||||
GC_unref(cast[PObject](q.data[i]))
|
||||
q.len = 0
|
||||
release(q.lock)
|
||||
signal(q.empty)
|
||||
|
||||
proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {.
|
||||
compilerProc.} =
|
||||
result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize))
|
||||
result.owner = cast[ptr Worker](owner)
|
||||
proc promFinalizer[T](prom: Promise[T]) = finished(prom)
|
||||
|
||||
proc nimCreatePromise[T](): Promise[T] {.compilerProc.} =
|
||||
new(result, promFinalizer)
|
||||
|
||||
proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} =
|
||||
prom.cv = createCondVar()
|
||||
@@ -160,16 +179,12 @@ proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} =
|
||||
signal(prom.ai.cv.c)
|
||||
if prom.usesCondVar: signal(prom.cv)
|
||||
|
||||
proc await*[T](prom: Promise[T]) =
|
||||
## waits until the value for the promise arrives.
|
||||
if prom.usesCondVar: await(prom.cv)
|
||||
|
||||
proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
|
||||
## blocks until the ``prom`` is available and then passes its value
|
||||
## to ``action``. Note that due to Nimrod's parameter passing semantics this
|
||||
## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
|
||||
## sometimes be more efficient than ``^``.
|
||||
if prom.usesCondVar: await(prom)
|
||||
await(prom)
|
||||
when T is string or T is seq:
|
||||
action(cast[T](prom.data))
|
||||
elif T is ref:
|
||||
@@ -179,23 +194,17 @@ proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
|
||||
finished(prom)
|
||||
|
||||
proc `^`*[T](prom: Promise[ref T]): foreign ptr T =
|
||||
## blocks until the value is available and then returns this value. Note
|
||||
## this reading is destructive for reasons of efficiency and convenience.
|
||||
## This calls ``finished(prom)``.
|
||||
if prom.usesCondVar: await(prom)
|
||||
## blocks until the value is available and then returns this value.
|
||||
await(prom)
|
||||
result = cast[foreign ptr T](prom.data)
|
||||
finished(prom)
|
||||
|
||||
proc `^`*[T](prom: Promise[T]): T =
|
||||
## blocks until the value is available and then returns this value. Note
|
||||
## this reading is destructive for reasons of efficiency and convenience.
|
||||
## This calls ``finished(prom)``.
|
||||
if prom.usesCondVar: await(prom)
|
||||
## blocks until the value is available and then returns this value.
|
||||
await(prom)
|
||||
when T is string or T is seq:
|
||||
result = cast[T](prom.data)
|
||||
else:
|
||||
result = prom.blob
|
||||
finished(prom)
|
||||
|
||||
proc awaitAny*(promises: openArray[RawPromise]): int =
|
||||
# awaits any of the given promises. Returns the index of one promise for which
|
||||
@@ -245,7 +254,7 @@ proc slave(w: ptr Worker) {.thread.} =
|
||||
await(w.taskArrived)
|
||||
assert(not w.ready)
|
||||
w.f(w, w.data)
|
||||
if w.head != nil: w.cleanPromises
|
||||
if w.q.len != 0: w.cleanPromises
|
||||
if w.shutdown:
|
||||
w.shutdown = false
|
||||
atomicDec currentPoolSize
|
||||
@@ -266,8 +275,9 @@ var
|
||||
proc activateThread(i: int) {.noinline.} =
|
||||
workersData[i].taskArrived = createCondVar()
|
||||
workersData[i].taskStarted = createCondVar()
|
||||
initLock workersData[i].promiseLock
|
||||
workersData[i].initialized = true
|
||||
initCond(workersData[i].q.empty)
|
||||
initLock(workersData[i].q.lock)
|
||||
createThread(workers[i], slave, addr(workersData[i]))
|
||||
|
||||
proc setup() =
|
||||
|
||||
@@ -153,9 +153,11 @@ when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
|
||||
## A value of 0 indicates typical alignment should be used. The compiler may also
|
||||
## ignore this parameter.
|
||||
|
||||
template fence*() = atomicThreadFence(ATOMIC_SEQ_CST)
|
||||
elif defined(vcc) and hasThreadSupport:
|
||||
proc addAndFetch*(p: ptr int, val: int): int {.
|
||||
importc: "NimXadd", nodecl.}
|
||||
|
||||
else:
|
||||
proc addAndFetch*(p: ptr int, val: int): int {.inline.} =
|
||||
inc(p[], val)
|
||||
@@ -231,4 +233,10 @@ elif false:
|
||||
|
||||
proc cpuRelax {.inline.} = os.sleep(1)
|
||||
|
||||
when not defined(fence) and hasThreadSupport:
|
||||
# XXX fixme
|
||||
proc fence*() {.inline.} =
|
||||
var dummy: bool
|
||||
discard cas(addr dummy, false, true)
|
||||
|
||||
{.pop.}
|
||||
|
||||
14
todo.txt
14
todo.txt
@@ -1,6 +1,20 @@
|
||||
version 0.9.6
|
||||
=============
|
||||
|
||||
Concurrency
|
||||
-----------
|
||||
|
||||
- document the new 'spawn' and 'parallel' statements
|
||||
- implement 'deepCopy' builtin
|
||||
- implement 'foo[1..4] = spawn(f[4..7])'
|
||||
- the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()'
|
||||
- Minor: The copying of the 'ref Promise' into the thead local storage only
|
||||
happens to work due to the write barrier's implementation
|
||||
|
||||
|
||||
Misc
|
||||
----
|
||||
|
||||
- fix the bug that keeps 'defer' template from working
|
||||
- make '--implicitStatic:on' the default
|
||||
- fix the tuple unpacking in lambda bug
|
||||
|
||||
Reference in New Issue
Block a user