mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-19 05:50:30 +00:00
first implementation of pinnedSpawn
This commit is contained in:
@@ -167,7 +167,7 @@ proc genDeref*(n: PNode): PNode =
|
||||
result.add n
|
||||
|
||||
proc callCodegenProc*(name: string, arg1: PNode;
|
||||
arg2, arg3: PNode = nil): PNode =
|
||||
arg2, arg3, optionalArgs: PNode = nil): PNode =
|
||||
result = newNodeI(nkCall, arg1.info)
|
||||
let sym = magicsys.getCompilerProc(name)
|
||||
if sym == nil:
|
||||
@@ -177,6 +177,9 @@ proc callCodegenProc*(name: string, arg1: PNode;
|
||||
result.add arg1
|
||||
if arg2 != nil: result.add arg2
|
||||
if arg3 != nil: result.add arg3
|
||||
if optionalArgs != nil:
|
||||
for i in 1..optionalArgs.len-3:
|
||||
result.add optionalArgs[i]
|
||||
result.typ = sym.typ.sons[0]
|
||||
|
||||
proc callProc(a: PNode): PNode =
|
||||
@@ -483,7 +486,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
|
||||
barrier, dest: PNode = nil): PNode =
|
||||
# if 'barrier' != nil, then it is in a 'parallel' section and we
|
||||
# generate quite different code
|
||||
let n = spawnExpr[1]
|
||||
let n = spawnExpr[^2]
|
||||
let spawnKind = spawnResult(retType, barrier!=nil)
|
||||
case spawnKind
|
||||
of srVoid:
|
||||
@@ -569,7 +572,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
|
||||
fvField = newDotExpr(scratchObj, field)
|
||||
fvAsExpr = indirectAccess(castExpr, field, n.info)
|
||||
# create flowVar:
|
||||
result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2]))
|
||||
result.add newFastAsgnStmt(fvField, callProc(spawnExpr[^1]))
|
||||
if barrier == nil:
|
||||
result.add callCodegenProc("nimFlowVarCreateSemaphore", fvField)
|
||||
|
||||
@@ -584,7 +587,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
|
||||
let wrapper = createWrapperProc(fn, threadParam, argsParam,
|
||||
varSection, varInit, call,
|
||||
barrierAsExpr, fvAsExpr, spawnKind)
|
||||
result.add callCodegenProc("nimSpawn", wrapper.newSymNode,
|
||||
genAddrOf(scratchObj.newSymNode))
|
||||
result.add callCodegenProc("nimSpawn" & $spawnExpr.len, wrapper.newSymNode,
|
||||
genAddrOf(scratchObj.newSymNode), nil, spawnExpr)
|
||||
|
||||
if spawnKind == srFlowVar: result.add fvField
|
||||
|
||||
@@ -1727,13 +1727,17 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
|
||||
dec c.inParallelStmt
|
||||
of mSpawn:
|
||||
result = setMs(n, s)
|
||||
result.sons[1] = semExpr(c, n.sons[1])
|
||||
if not result[1].typ.isEmptyType:
|
||||
if spawnResult(result[1].typ, c.inParallelStmt > 0) == srFlowVar:
|
||||
result.typ = createFlowVar(c, result[1].typ, n.info)
|
||||
for i in 1 .. <n.len:
|
||||
result.sons[i] = semExpr(c, n.sons[i])
|
||||
let typ = result[^1].typ
|
||||
if not typ.isEmptyType:
|
||||
if spawnResult(typ, c.inParallelStmt > 0) == srFlowVar:
|
||||
result.typ = createFlowVar(c, typ, n.info)
|
||||
else:
|
||||
result.typ = result[1].typ
|
||||
result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode
|
||||
result.typ = typ
|
||||
result.add instantiateCreateFlowVarCall(c, typ, n.info).newSymNode
|
||||
else:
|
||||
result.add emptyNode
|
||||
of mProcCall:
|
||||
result = setMs(n, s)
|
||||
result.sons[1] = semExpr(c, n.sons[1])
|
||||
|
||||
@@ -237,9 +237,10 @@ proc useVar(a: PEffects, n: PNode) =
|
||||
message(n.info, warnUninit, s.name.s)
|
||||
# prevent superfluous warnings about the same variable:
|
||||
a.init.add s.id
|
||||
if {sfGlobal, sfThread} * s.flags == {sfGlobal} and s.kind in {skVar, skLet}:
|
||||
if {sfGlobal, sfThread} * s.flags != {} and s.kind in {skVar, skLet}:
|
||||
if s.guard != nil: guardGlobal(a, n, s.guard)
|
||||
if (tfHasGCedMem in s.typ.flags or s.typ.isGCedMem):
|
||||
if {sfGlobal, sfThread} * s.flags == {sfGlobal} and
|
||||
(tfHasGCedMem in s.typ.flags or s.typ.isGCedMem):
|
||||
#if warnGcUnsafe in gNotes: warnAboutGcUnsafe(n)
|
||||
markGcUnsafe(a, s)
|
||||
|
||||
|
||||
@@ -267,6 +267,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
|
||||
const
|
||||
MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
|
||||
## should be good enough for anybody ;-)
|
||||
MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
|
||||
|
||||
type
|
||||
ThreadId* = range[0..MaxDistinguishedThread-1]
|
||||
|
||||
var
|
||||
currentPoolSize: int
|
||||
@@ -291,10 +295,24 @@ proc slave(w: ptr Worker) {.thread.} =
|
||||
w.shutdown = false
|
||||
atomicDec currentPoolSize
|
||||
|
||||
proc distinguishedSlave(w: ptr Worker) {.thread.} =
|
||||
while true:
|
||||
when declared(atomicStoreN):
|
||||
atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
|
||||
else:
|
||||
w.ready = true
|
||||
await(w.taskArrived)
|
||||
assert(not w.ready)
|
||||
w.f(w, w.data)
|
||||
if w.q.len != 0: w.cleanFlowVars
|
||||
|
||||
var
|
||||
workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
|
||||
workersData: array[MaxThreadPoolSize, Worker]
|
||||
|
||||
distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
|
||||
distinguishedData: array[MaxDistinguishedThread, Worker]
|
||||
|
||||
proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
|
||||
## sets the minimal thread pool size. The default value of this is 4.
|
||||
minPoolSize = size
|
||||
@@ -308,7 +326,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
|
||||
let w = addr(workersData[i])
|
||||
w.shutdown = true
|
||||
|
||||
proc activateThread(i: int) {.noinline.} =
|
||||
proc activateWorkerThread(i: int) {.noinline.} =
|
||||
workersData[i].taskArrived = createSemaphore()
|
||||
workersData[i].taskStarted = createSemaphore()
|
||||
workersData[i].initialized = true
|
||||
@@ -316,10 +334,18 @@ proc activateThread(i: int) {.noinline.} =
|
||||
initLock(workersData[i].q.lock)
|
||||
createThread(workers[i], slave, addr(workersData[i]))
|
||||
|
||||
proc activateDistinguishedThread(i: int) {.noinline.} =
|
||||
distinguishedData[i].taskArrived = createSemaphore()
|
||||
distinguishedData[i].taskStarted = createSemaphore()
|
||||
distinguishedData[i].initialized = true
|
||||
distinguishedData[i].q.empty = createSemaphore()
|
||||
initLock(distinguishedData[i].q.lock)
|
||||
createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
|
||||
|
||||
proc setup() =
|
||||
currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
|
||||
readyWorker = addr(workersData[0])
|
||||
for i in 0.. <currentPoolSize: activateThread(i)
|
||||
for i in 0.. <currentPoolSize: activateWorkerThread(i)
|
||||
|
||||
proc preferSpawn*(): bool =
|
||||
## Use this proc to determine quickly if a 'spawn' or a direct call is
|
||||
@@ -333,6 +359,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
|
||||
## is gcsafe and has a return type that is either 'void' or compatible
|
||||
## with ``FlowVar[T]``.
|
||||
|
||||
proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
|
||||
## always spawns a new task on the worker thread with ``id``, so that
|
||||
## the 'call' is **always** executed on
|
||||
## the this thread. 'call' has to be proc call 'p(...)' where 'p'
|
||||
## is gcsafe and has a return type that is either 'void' or compatible
|
||||
## with ``FlowVar[T]``.
|
||||
|
||||
template spawnX*(call: expr): expr =
|
||||
## spawns a new task if a CPU core is ready, otherwise executes the
|
||||
## call in the calling thread. Usually it is advised to
|
||||
@@ -353,7 +386,7 @@ var
|
||||
|
||||
initLock stateLock
|
||||
|
||||
proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
|
||||
proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
|
||||
# implementation of 'spawn' that is used by the code generator.
|
||||
while true:
|
||||
if selectWorker(readyWorker, fn, data): return
|
||||
@@ -370,7 +403,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
|
||||
of doCreateThread:
|
||||
if currentPoolSize < maxPoolSize:
|
||||
if not workersData[currentPoolSize].initialized:
|
||||
activateThread(currentPoolSize)
|
||||
activateWorkerThread(currentPoolSize)
|
||||
let w = addr(workersData[currentPoolSize])
|
||||
atomicInc currentPoolSize
|
||||
if selectWorker(w, fn, data):
|
||||
@@ -387,6 +420,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
|
||||
# other thread succeeded, so we don't need to do anything here.
|
||||
await(gSomeReady)
|
||||
|
||||
var
|
||||
distinguishedLock: TLock
|
||||
|
||||
initLock distinguishedLock
|
||||
|
||||
proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
|
||||
acquire(distinguishedLock)
|
||||
if not distinguishedData[id].initialized:
|
||||
activateDistinguishedThread(id)
|
||||
while true:
|
||||
if selectWorker(addr(distinguishedData[id]), fn, data): break
|
||||
cpuRelax()
|
||||
# XXX exponential backoff?
|
||||
release(distinguishedLock)
|
||||
|
||||
proc sync*() =
|
||||
## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
|
||||
## waiting, you have to use an explicit barrier.
|
||||
|
||||
Reference in New Issue
Block a user