mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-19 14:00:35 +00:00
* Fix sym owner in wrapper proc * threadpool changes * revert lowerings * add newFastMoveStmt * try fixing test by switching to cpp Co-authored-by: cooldome <ariabushenko@bk.ru>
This commit is contained in:
@@ -44,7 +44,7 @@ type
|
||||
graph: ModuleGraph
|
||||
emptyNode: PNode
|
||||
otherRead: PNode
|
||||
inLoop, hasUnstructuredCf, inDangerousBranch: int
|
||||
inLoop, inSpawn, hasUnstructuredCf, inDangerousBranch: int
|
||||
declaredVars: IntSet # variables we already moved to the top level
|
||||
uninit: IntSet # set of uninit'ed vars
|
||||
uninitComputed: bool
|
||||
@@ -398,7 +398,7 @@ proc passCopyToSink(n: PNode; c: var Con): PNode =
|
||||
var m = genCopy(c, tmp, n)
|
||||
m.add p(n, c, normal)
|
||||
result.add m
|
||||
if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef:
|
||||
if isLValue(n) and not isClosureEnv(n) and n.typ.skipTypes(abstractInst).kind != tyRef and c.inSpawn == 0:
|
||||
message(c.graph.config, n.info, hintPerformance,
|
||||
("passing '$1' to a sink parameter introduces an implicit copy; " &
|
||||
"use 'move($1)' to prevent it") % $n)
|
||||
@@ -830,6 +830,12 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode =
|
||||
if mode == normal and isRefConstr:
|
||||
result = ensureDestruction(result, c)
|
||||
of nkCallKinds:
|
||||
let inSpawn = c.inSpawn
|
||||
if n[0].kind == nkSym and n[0].sym.magic == mSpawn:
|
||||
c.inSpawn.inc
|
||||
elif c.inSpawn > 0:
|
||||
c.inSpawn.dec
|
||||
|
||||
let parameters = n[0].typ
|
||||
let L = if parameters != nil: parameters.len else: 0
|
||||
|
||||
@@ -840,7 +846,7 @@ proc p(n: PNode; c: var Con; mode: ProcessMode): PNode =
|
||||
|
||||
result = shallowCopy(n)
|
||||
for i in 1..<n.len:
|
||||
if i < L and isSinkTypeForParam(parameters[i]):
|
||||
if i < L and (isSinkTypeForParam(parameters[i]) or inSpawn > 0):
|
||||
result[i] = p(n[i], c, sinkArg)
|
||||
else:
|
||||
result[i] = p(n[i], c, normal)
|
||||
@@ -1085,7 +1091,6 @@ proc injectDestructorCalls*(g: ModuleGraph; owner: PSym; n: PNode): PNode =
|
||||
let t = params[i].sym.typ
|
||||
if isSinkTypeForParam(t) and hasDestructor(t.skipTypes({tySink})):
|
||||
c.destroys.add genDestroy(c, params[i])
|
||||
|
||||
#if optNimV2 in c.graph.config.globalOptions:
|
||||
# injectDefaultCalls(n, c)
|
||||
let body = p(n, c, normal)
|
||||
|
||||
@@ -59,6 +59,13 @@ proc newFastAsgnStmt*(le, ri: PNode): PNode =
|
||||
result[0] = le
|
||||
result[1] = ri
|
||||
|
||||
proc newFastMoveStmt*(g: ModuleGraph, le, ri: PNode): PNode =
|
||||
result = newNodeI(nkFastAsgn, le.info, 2)
|
||||
result[0] = le
|
||||
result[1] = newNodeIT(nkCall, ri.info, ri.typ)
|
||||
result[1].add newSymNode(getSysMagic(g, ri.info, "move", mMove))
|
||||
result[1].add ri
|
||||
|
||||
proc lowerTupleUnpacking*(g: ModuleGraph; n: PNode; owner: PSym): PNode =
|
||||
assert n.kind == nkVarTuple
|
||||
let value = n.lastSon
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
## This module implements threadpool's ``spawn``.
|
||||
|
||||
import ast, types, idents, magicsys, msgs, options, modulegraphs,
|
||||
lowerings
|
||||
lowerings, liftdestructors
|
||||
from trees import getMagic, getRoot
|
||||
|
||||
proc callProc(a: PNode): PNode =
|
||||
@@ -36,8 +36,9 @@ proc spawnResult*(t: PType; inParallel: bool): TSpawnResult =
|
||||
elif inParallel and not containsGarbageCollectedRef(t): srByVar
|
||||
else: srFlowVar
|
||||
|
||||
proc flowVarKind(t: PType): TFlowVarKind =
|
||||
if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
|
||||
proc flowVarKind(c: ConfigRef, t: PType): TFlowVarKind =
|
||||
if c.selectedGC in {gcArc, gcOrc}: fvBlob
|
||||
elif t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
|
||||
elif containsGarbageCollectedRef(t): fvInvalid
|
||||
else: fvBlob
|
||||
|
||||
@@ -66,17 +67,11 @@ proc addLocalVar(g: ModuleGraph; varSection, varInit: PNode; owner: PSym; typ: P
|
||||
varSection.add vpart
|
||||
if varInit != nil:
|
||||
if g.config.selectedGC in {gcArc, gcOrc}:
|
||||
if typ.attachedOps[attachedAsgn] != nil:
|
||||
var call = newNode(nkCall)
|
||||
call.add newSymNode(typ.attachedOps[attachedAsgn])
|
||||
call.add genAddrOf(newSymNode(result), tyVar)
|
||||
call.add v
|
||||
varInit.add call
|
||||
else:
|
||||
varInit.add newFastAsgnStmt(newSymNode(result), v)
|
||||
# inject destructors pass will do its own analysis
|
||||
varInit.add newFastMoveStmt(g, newSymNode(result), v)
|
||||
else:
|
||||
if useShallowCopy and typeNeedsNoDeepCopy(typ) or optTinyRtti in g.config.globalOptions:
|
||||
varInit.add newFastAsgnStmt(newSymNode(result), v)
|
||||
varInit.add newFastMoveStmt(g, newSymNode(result), v)
|
||||
else:
|
||||
let deepCopyCall = newNodeI(nkCall, varInit.info, 3)
|
||||
deepCopyCall[0] = newSymNode(getSysMagic(g, varSection.info, "deepCopy", mDeepCopy))
|
||||
@@ -145,7 +140,7 @@ proc createWrapperProc(g: ModuleGraph; f: PNode; threadParam, argsParam: PSym;
|
||||
if spawnKind == srByVar:
|
||||
body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
|
||||
elif fv != nil:
|
||||
let fk = fv.typ[1].flowVarKind
|
||||
let fk = flowVarKind(g.config, fv.typ[1])
|
||||
if fk == fvInvalid:
|
||||
localError(g.config, f.info, "cannot create a flowVar of type: " &
|
||||
typeToString(fv.typ[1]))
|
||||
@@ -333,6 +328,8 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P
|
||||
wrapperProc = newSym(skProc, getIdent(g.cache, name), owner, fn.info, g.config.options)
|
||||
threadParam = newSym(skParam, getIdent(g.cache, "thread"), wrapperProc, n.info, g.config.options)
|
||||
argsParam = newSym(skParam, getIdent(g.cache, "args"), wrapperProc, n.info, g.config.options)
|
||||
|
||||
wrapperProc.flags.incl sfInjectDestructors
|
||||
block:
|
||||
let ptrType = getSysType(g, n.info, tyPointer)
|
||||
threadParam.typ = ptrType
|
||||
@@ -411,6 +408,7 @@ proc wrapProcForSpawn*(g: ModuleGraph; owner: PSym; spawnExpr: PNode; retType: P
|
||||
fvAsExpr = indirectAccess(castExpr, field, n.info)
|
||||
result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))
|
||||
|
||||
createTypeBoundOps(g, nil, objType, n.info)
|
||||
createWrapperProc(g, fn, threadParam, argsParam,
|
||||
varSection, varInit, call,
|
||||
barrierAsExpr, fvAsExpr, spawnKind, wrapperProc)
|
||||
|
||||
@@ -137,7 +137,7 @@ type
|
||||
|
||||
const threadpoolWaitMs {.intdefine.}: int = 100
|
||||
|
||||
proc blockUntil*(fv: FlowVarBase) =
|
||||
proc blockUntil*(fv: var FlowVarBaseObj) =
|
||||
## Waits until the value for the ``fv`` arrives.
|
||||
##
|
||||
## Usually it is not necessary to call this explicitly.
|
||||
@@ -185,7 +185,7 @@ proc attach(fv: FlowVarBase; i: int): bool =
|
||||
result = false
|
||||
release(fv.cv.L)
|
||||
|
||||
proc finished(fv: FlowVarBase) =
|
||||
proc finished(fv: var FlowVarBaseObj) =
|
||||
doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
|
||||
# we have to protect against the rare cases where the owner of the flowVar
|
||||
# simply disregards the flowVar and yet the "flowVar" has not yet written
|
||||
@@ -208,10 +208,12 @@ proc finished(fv: FlowVarBase) =
|
||||
# the worker thread waits for "data" to be set to nil before shutting down
|
||||
owner.data = nil
|
||||
|
||||
proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
|
||||
proc `=destroy`[T](fv: var FlowVarObj[T]) =
|
||||
finished(fv)
|
||||
`=destroy`(fv.blob)
|
||||
|
||||
proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
|
||||
new(result, fvFinalizer)
|
||||
new(result)
|
||||
|
||||
proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
|
||||
fv.cv.initSemaphore()
|
||||
@@ -234,43 +236,34 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
|
||||
## Note that due to Nim's parameter passing semantics this
|
||||
## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can
|
||||
## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_.
|
||||
blockUntil(fv)
|
||||
when T is string or T is seq:
|
||||
blockUntil(fv[])
|
||||
when defined(nimV2):
|
||||
action(fv.blob)
|
||||
elif T is string or T is seq:
|
||||
action(cast[T](fv.data))
|
||||
elif T is ref:
|
||||
{.error: "'awaitAndThen' not available for FlowVar[ref]".}
|
||||
else:
|
||||
action(fv.blob)
|
||||
finished(fv)
|
||||
finished(fv[])
|
||||
|
||||
proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
|
||||
## Blocks until the value is available and then returns this value.
|
||||
blockUntil(fv)
|
||||
result = cast[ptr T](fv.data)
|
||||
finished(fv)
|
||||
|
||||
proc `^`*[T](fv: FlowVar[ref T]): ref T =
|
||||
## Blocks until the value is available and then returns this value.
|
||||
blockUntil(fv)
|
||||
let src = cast[ref T](fv.data)
|
||||
blockUntil(fv[])
|
||||
when defined(nimV2):
|
||||
result = src
|
||||
result = cast[ptr T](fv.blob)
|
||||
else:
|
||||
deepCopy result, src
|
||||
finished(fv)
|
||||
result = cast[ptr T](fv.data)
|
||||
finished(fv[])
|
||||
|
||||
proc `^`*[T](fv: FlowVar[T]): T =
|
||||
## Blocks until the value is available and then returns this value.
|
||||
blockUntil(fv)
|
||||
when T is string or T is seq:
|
||||
let src = cast[T](fv.data)
|
||||
when defined(nimV2):
|
||||
result = src
|
||||
else:
|
||||
deepCopy result, src
|
||||
blockUntil(fv[])
|
||||
when not defined(nimV2) and (T is string or T is seq or T is ref):
|
||||
deepCopy result, cast[T](fv.data)
|
||||
else:
|
||||
result = fv.blob
|
||||
finished(fv)
|
||||
finished(fv[])
|
||||
|
||||
proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
|
||||
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
|
||||
@@ -457,14 +450,14 @@ proc preferSpawn*(): bool =
|
||||
## <#spawnX.t>`_ instead.
|
||||
result = gSomeReady.counter > 0
|
||||
|
||||
proc spawn*(call: typed): void {.magic: "Spawn".}
|
||||
proc spawn*(call: sink typed): void {.magic: "Spawn".}
|
||||
## Always spawns a new task, so that the ``call`` is never executed on
|
||||
## the calling thread.
|
||||
##
|
||||
## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
|
||||
## return type that is either ``void`` or compatible with ``FlowVar[T]``.
|
||||
|
||||
proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".}
|
||||
proc pinnedSpawn*(id: ThreadId; call: sink typed): void {.magic: "Spawn".}
|
||||
## Always spawns a new task on the worker thread with ``id``, so that
|
||||
## the ``call`` is **always** executed on the thread.
|
||||
##
|
||||
|
||||
63
tests/arc/tthread.nim
Normal file
63
tests/arc/tthread.nim
Normal file
@@ -0,0 +1,63 @@
|
||||
discard """
|
||||
cmd: "nim cpp --gc:arc --threads:on $file"
|
||||
output: '''ok1
|
||||
ok2
|
||||
destroyed
|
||||
destroyed
|
||||
destroyed
|
||||
'''
|
||||
"""
|
||||
import threadpool, os
|
||||
|
||||
type
|
||||
MyObj = object
|
||||
p: int
|
||||
MyObjRef = ref MyObj
|
||||
|
||||
proc `=destroy`(x: var MyObj) =
|
||||
if x.p != 0:
|
||||
echo "destroyed"
|
||||
|
||||
proc thread1(): string =
|
||||
os.sleep(1000)
|
||||
return "ok1"
|
||||
|
||||
proc thread2(): ref string =
|
||||
os.sleep(1000)
|
||||
new(result)
|
||||
result[] = "ok2"
|
||||
|
||||
proc thread3(): ref MyObj =
|
||||
os.sleep(1000)
|
||||
new(result)
|
||||
result[].p = 2
|
||||
|
||||
var fv1 = spawn thread1()
|
||||
var fv2 = spawn thread2()
|
||||
var fv3 = spawn thread3()
|
||||
sync()
|
||||
echo ^fv1
|
||||
echo (^fv2)[]
|
||||
|
||||
|
||||
proc thread4(x: MyObjRef): MyObjRef {.nosinks.} =
|
||||
os.sleep(1000)
|
||||
result = x
|
||||
|
||||
proc thread5(x: sink MyObjRef): MyObjRef =
|
||||
os.sleep(1000)
|
||||
result = x
|
||||
|
||||
proc ref_forwarding_test =
|
||||
var x = new(MyObj)
|
||||
x[].p = 2
|
||||
var y = spawn thread4(x)
|
||||
|
||||
proc ref_sink_forwarding_test =
|
||||
var x = new(MyObj)
|
||||
x[].p = 2
|
||||
var y = spawn thread5(x)
|
||||
|
||||
ref_forwarding_test()
|
||||
ref_sink_forwarding_test()
|
||||
sync()
|
||||
Reference in New Issue
Block a user