mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-28 17:04:41 +00:00
progress with futures
This commit is contained in:
@@ -605,7 +605,7 @@ const
|
||||
# thus cannot be overloaded (also documented in the spec!):
|
||||
SpecialSemMagics* = {
|
||||
mDefined, mDefinedInScope, mCompiles, mLow, mHigh, mSizeOf, mIs, mOf,
|
||||
mEcho, mShallowCopy, mExpandToAst, mParallel}
|
||||
mEcho, mShallowCopy, mExpandToAst, mParallel, mSpawn}
|
||||
|
||||
type
|
||||
PNode* = ref TNode
|
||||
|
||||
@@ -86,8 +86,14 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
|
||||
# returns a[].b as a node
|
||||
var deref = newNodeI(nkHiddenDeref, info)
|
||||
deref.typ = a.typ.skipTypes(abstractInst).sons[0]
|
||||
assert deref.typ.kind == tyObject
|
||||
let field = getSymFromList(deref.typ.n, getIdent(b))
|
||||
var t = deref.typ
|
||||
var field: PSym
|
||||
while true:
|
||||
assert t.kind == tyObject
|
||||
field = getSymFromList(t.n, getIdent(b))
|
||||
if field != nil: break
|
||||
t = t.sons[0]
|
||||
if t == nil: break
|
||||
assert field != nil, b
|
||||
addSon(deref, a)
|
||||
result = newNodeI(nkDotExpr, info)
|
||||
@@ -124,6 +130,7 @@ proc callCodegenProc*(name: string, arg1: PNode;
|
||||
result.add arg1
|
||||
if arg2 != nil: result.add arg2
|
||||
if arg3 != nil: result.add arg3
|
||||
result.typ = sym.typ.sons[0]
|
||||
|
||||
# we have 4 cases to consider:
|
||||
# - a void proc --> nothing to do
|
||||
@@ -152,15 +159,21 @@ discard """
|
||||
We generate roughly this:
|
||||
|
||||
proc f_wrapper(args) =
|
||||
barrierEnter(args.barrier) # for parallel statement
|
||||
var a = args.a # copy strings/seqs; thread transfer; not generated for
|
||||
# the 'parallel' statement
|
||||
var b = args.b
|
||||
|
||||
args.fut = createFuture(thread, sizeof(T)) # optional
|
||||
args.fut = nimCreateFuture(thread, sizeof(T)) # optional
|
||||
nimFutureCreateCondVar(args.fut) # optional
|
||||
nimArgsPassingDone() # signal parent that the work is done
|
||||
#
|
||||
args.fut.blob = f(a, b, ...)
|
||||
nimFutureSignal(args.fut)
|
||||
|
||||
# - or -
|
||||
f(a, b, ...)
|
||||
barrierLeave(args.barrier) # for parallel statement
|
||||
|
||||
stmtList:
|
||||
var scratchObj
|
||||
@@ -196,8 +209,12 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
|
||||
|
||||
body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode)
|
||||
if fut != nil:
|
||||
body.add newAsgnStmt(indirectAccess(fut,
|
||||
if fut.typ.futureKind==futGC: "data" else: "blob", fut.info), call)
|
||||
let fk = fut.typ.sons[1].futureKind
|
||||
if fk == futInvalid:
|
||||
localError(f.info, "cannot create a future of type: " &
|
||||
typeToString(fut.typ.sons[1]))
|
||||
body.add newAsgnStmt(indirectAccess(fut,
|
||||
if fk == futGC: "data" else: "blob", fut.info), call)
|
||||
if barrier == nil:
|
||||
body.add callCodeGenProc("nimFutureSignal", fut)
|
||||
else:
|
||||
|
||||
@@ -1579,6 +1579,12 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode =
|
||||
else:
|
||||
result = semDirectOp(c, n, flags)
|
||||
|
||||
proc createFuture(c: PContext; t: PType; info: TLineInfo): PType =
|
||||
result = newType(tyGenericInvokation, c.module)
|
||||
addSonSkipIntLit(result, magicsys.getCompilerProc("Future").typ)
|
||||
addSonSkipIntLit(result, t)
|
||||
result = instGenericContainer(c, info, result, allowMetaTypes = false)
|
||||
|
||||
proc setMs(n: PNode, s: PSym): PNode =
|
||||
result = n
|
||||
n.sons[0] = newSymNode(s)
|
||||
@@ -1610,6 +1616,12 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
|
||||
var x = n.lastSon
|
||||
if x.kind == nkDo: x = x.sons[bodyPos]
|
||||
result.sons[1] = semStmt(c, x)
|
||||
of mSpawn:
|
||||
result = setMs(n, s)
|
||||
result.sons[1] = semExpr(c, n.sons[1])
|
||||
# later passes may transform the type 'Future[T]' back into 'T'
|
||||
if not result[1].typ.isEmptyType:
|
||||
result.typ = createFuture(c, result[1].typ, n.info)
|
||||
else: result = semDirectOp(c, n, flags)
|
||||
|
||||
proc semWhen(c: PContext, n: PNode, semCheck = true): PNode =
|
||||
|
||||
@@ -115,12 +115,6 @@ proc semLocals(c: PContext, n: PNode): PNode =
|
||||
if it.typ.skipTypes({tyGenericInst}).kind == tyVar: a = newDeref(a)
|
||||
result.add(a)
|
||||
|
||||
proc createFuture(c: PContext; t: PType; info: TLineInfo): PType =
|
||||
result = newType(tyGenericInvokation, c.module)
|
||||
addSonSkipIntLit(result, magicsys.getCompilerProc("Future").typ)
|
||||
addSonSkipIntLit(result, t)
|
||||
result = instGenericContainer(c, info, result, allowMetaTypes = false)
|
||||
|
||||
proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode
|
||||
proc magicsAfterOverloadResolution(c: PContext, n: PNode,
|
||||
flags: TExprFlags): PNode =
|
||||
@@ -136,9 +130,4 @@ proc magicsAfterOverloadResolution(c: PContext, n: PNode,
|
||||
of mShallowCopy: result = semShallowCopy(c, n, flags)
|
||||
of mNBindSym: result = semBindSym(c, n)
|
||||
of mLocals: result = semLocals(c, n)
|
||||
of mSpawn:
|
||||
result = n
|
||||
# later passes may transform the type 'Future[T]' back into 'T'
|
||||
if not n[1].typ.isEmptyType:
|
||||
result.typ = createFuture(c, n[1].typ, n.info)
|
||||
else: result = n
|
||||
|
||||
@@ -57,7 +57,7 @@ proc openBarrier*(b: ptr Barrier) {.compilerProc.} =
|
||||
b.cv = createCondVar()
|
||||
|
||||
proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
|
||||
await(b.cv)
|
||||
while b.counter > 0: await(b.cv)
|
||||
destroyCondVar(b.cv)
|
||||
|
||||
{.pop.}
|
||||
@@ -136,8 +136,13 @@ proc nimFutureCreateCondVar(fut: RawFuture) {.compilerProc.} =
|
||||
fut.usesCondVar = true
|
||||
|
||||
proc nimFutureSignal(fut: RawFuture) {.compilerProc.} =
|
||||
assert fut.usesCondVar
|
||||
signal(fut.cv)
|
||||
if fut.ai != nil:
|
||||
acquire(fut.ai.cv.L)
|
||||
fut.ai.idx = fut.idx
|
||||
inc fut.ai.cv.counter
|
||||
release(fut.ai.cv.L)
|
||||
signal(fut.ai.cv.c)
|
||||
if fut.usesCondVar: signal(fut.cv)
|
||||
|
||||
proc await*[T](fut: Future[T]) =
|
||||
## waits until the value for the future arrives.
|
||||
@@ -147,28 +152,21 @@ proc `^`*[T](fut: Future[T]): T =
|
||||
## blocks until the value is available and then returns this value. Note
|
||||
## this reading is destructive for reasons of efficiency and convenience.
|
||||
## This calls ``finished(fut)``.
|
||||
await(fut)
|
||||
if fut.usesCondVar: await(fut)
|
||||
when T is string or T is seq or T is ref:
|
||||
result = cast[T](fut.data)
|
||||
else:
|
||||
result = fut.payload
|
||||
result = fut.blob
|
||||
finished(fut)
|
||||
|
||||
proc notify*(fut: RawFuture) {.compilerproc.} =
|
||||
if fut.ai != nil:
|
||||
acquire(fut.ai.cv.L)
|
||||
fut.ai.idx = fut.idx
|
||||
inc fut.ai.cv.counter
|
||||
release(fut.ai.cv.L)
|
||||
signal(fut.ai.cv.c)
|
||||
if fut.usesCondVar: signal(fut.cv)
|
||||
|
||||
proc awaitAny*(futures: openArray[RawFuture]): int =
|
||||
# awaits any of the given futures. Returns the index of one future for which
|
||||
## a value arrived. A future only supports one call to 'awaitAny' at the
|
||||
## same time. That means if you await([a,b]) and await([b,c]) the second
|
||||
## call will only await 'c'. If there is no future left to be able to wait
|
||||
## on, -1 is returned.
|
||||
## **Note**: This results in non-deterministic behaviour and so should be
|
||||
## avoided.
|
||||
var ai: AwaitInfo
|
||||
ai.cv = createCondVar()
|
||||
var conflicts = 0
|
||||
@@ -245,19 +243,18 @@ proc preferSpawn*(): bool =
|
||||
## it is not necessary to call this directly; use 'spawnX' instead.
|
||||
result = gSomeReady.counter > 0
|
||||
|
||||
proc spawn*(call: stmt) {.magic: "Spawn".}
|
||||
proc spawn*(call: expr): expr {.magic: "Spawn".}
|
||||
## always spawns a new task, so that the 'call' is never executed on
|
||||
## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
|
||||
## is gcsafe and has 'void' as the return type.
|
||||
|
||||
template spawnX*(call: stmt) =
|
||||
template spawnX*(call: expr): expr =
|
||||
## spawns a new task if a CPU core is ready, otherwise executes the
|
||||
## call in the calling thread. Usually it is advised to
|
||||
## use 'spawn' in order to not block the producer for an unknown
|
||||
## amount of time. 'call' has to be proc call 'p(...)' where 'p'
|
||||
## is gcsafe and has 'void' as the return type.
|
||||
if preferSpawn(): spawn call
|
||||
else: call
|
||||
(if preferSpawn(): spawn call else: call)
|
||||
|
||||
proc parallel*(body: stmt) {.magic: "Parallel".}
|
||||
## a parallel section can be used to execute a block in parallel. ``body``
|
||||
|
||||
17
tests/parallel/tflowvar.nim
Normal file
17
tests/parallel/tflowvar.nim
Normal file
@@ -0,0 +1,17 @@
|
||||
discard """
|
||||
output: '''foobarfoobarbazbearbazbear'''
|
||||
cmd: "nimrod $target --threads:on $options $file"
|
||||
"""
|
||||
|
||||
import threadpool
|
||||
|
||||
proc computeSomething(a, b: string): string = a & b & a & b
|
||||
|
||||
proc main =
|
||||
let fvA = spawn computeSomething("foo", "bar")
|
||||
let fvB = spawn computeSomething("baz", "bear")
|
||||
|
||||
echo(^fvA, ^fvB)
|
||||
|
||||
main()
|
||||
sync()
|
||||
@@ -1,6 +1,6 @@
|
||||
discard """
|
||||
line: 7
|
||||
errormsg: "'spawn' takes a call expression of type void"
|
||||
errormsg: "'spawn' takes a call expression"
|
||||
cmd: "nimrod $target --threads:on $options $file"
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user