mirror of
https://github.com/nim-lang/Nim.git
synced 2026-02-23 11:26:52 +00:00
added actors.nim file; compiler not up for this task
This commit is contained in:
184
lib/pure/actors.nim
Normal file
184
lib/pure/actors.nim
Normal file
@@ -0,0 +1,184 @@
|
||||
#
|
||||
#
|
||||
# Nimrod's Runtime Library
|
||||
# (c) Copyright 2011 Andreas Rumpf
|
||||
#
|
||||
# See the file "copying.txt", included in this
|
||||
# distribution, for details about the copyright.
|
||||
#
|
||||
|
||||
## `Actor`:idx: support for Nimrod. An actor is implemented as a thread with
|
||||
## a channel as its inbox. This module requires the ``--threads:on``
|
||||
## command line switch.
|
||||
|
||||
type
|
||||
TTask*[TIn, TOut] = object{.pure, final.}
|
||||
when TOut isnot void:
|
||||
receiver*: ptr TChannel[TOut] ## the receiver channel of the response
|
||||
action*: proc (x: TIn): TOut {.thread.} ## action to execute;
|
||||
## sometimes useful
|
||||
shutDown*: bool ## set to tell an actor to shut-down
|
||||
data*: TIn ## the data to process
|
||||
|
||||
TActor[TIn, TOut] = object{.pure, final.}
|
||||
i: TChannel[TTask[TIn, TOut]]
|
||||
t: TThread[ptr TActor[TIn, TOut]]
|
||||
|
||||
PActor*[TIn, TOut] = ptr TActor[TIn, TOut] ## an actor
|
||||
|
||||
proc spawn*[TIn, TOut](action: proc(
|
||||
self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
|
||||
## creates an actor; that is a thread with an inbox. The caller MUST call
|
||||
## ``join`` because that also frees the associated resources with the actor.
|
||||
result = allocShared0(sizeof(result[]))
|
||||
open(result.i)
|
||||
createThread(result.t, action, result)
|
||||
|
||||
proc inbox*[TIn, TOut](self: PActor[TIn, TOut]): ptr TChannel[TIn] =
|
||||
## gets a pointer to the associated inbox of the actor `self`.
|
||||
result = addr(self.i)
|
||||
|
||||
proc running*[TIn, TOut](a: PActor[TIn, TOut]) =
|
||||
## returns true if the actor `a` is running.
|
||||
result = running(a.t)
|
||||
|
||||
proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
|
||||
## joins an actor.
|
||||
joinThread(a.t)
|
||||
close(a.i)
|
||||
deallocShared(a)
|
||||
|
||||
proc recv*[TIn, TOut](a: PActor[TIn, TOut]): TTask[TIn, TOut] =
|
||||
## receives a task from `a`'s inbox.
|
||||
result = recv(a.i)
|
||||
|
||||
proc send*[TIn, TOut, X, Y](sender: PActor[X, Z],
|
||||
receiver: PActor[TIn, TOut], msg: TIn) =
|
||||
## sends a message to `a`'s inbox.
|
||||
var t: TTask[TIn, TOut]
|
||||
t.receiver = addr(sender.i)
|
||||
shallowCopy(t.data, msg)
|
||||
send(receiver.i, t)
|
||||
|
||||
proc send*[TIn, TOut](receiver: PActor[TIn, TOut], msg: TIn,
|
||||
sender: ptr TChannel[TOut] = nil) =
|
||||
## sends a message to `receiver`'s inbox.
|
||||
var t: TTask[TIn, TOut]
|
||||
t.receiver = sender
|
||||
shallowCopy(t.data, msg)
|
||||
send(receiver.i, t)
|
||||
|
||||
proc sendShutdown*[TIn, TOut](receiver: PActor[TIn, TOut]) =
|
||||
## send a shutdown message to `receiver`.
|
||||
var t: TTask[TIn, TOut]
|
||||
t.shutdown = true
|
||||
send(receiver.i, t)
|
||||
|
||||
proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) =
|
||||
## sends a message to io's output message box.
|
||||
when TOut is void:
|
||||
{.error: "you cannot reply to a void outbox".}
|
||||
assert t.receiver != nil
|
||||
send(t.receiver[], m)
|
||||
|
||||
|
||||
# ----------------- actor pools ----------------------------------------------
|
||||
|
||||
type
|
||||
TActorPool*[TIn, TOut] = object{.pure, final.} ## an actor pool
|
||||
actors: seq[PActor[TIn, TOut]]
|
||||
when TOut isnot void:
|
||||
outputs: TChannel[TOut]
|
||||
|
||||
proc `^`*[T](f: ptr TChannel[T]): T =
|
||||
## alias for 'recv'.
|
||||
result = recv(f[])
|
||||
|
||||
proc poolWorker[TIn, TOut](self: PActor[TIn, TOut]) {.thread.} =
|
||||
while true:
|
||||
var m = self.recv
|
||||
if m.shutDown: break
|
||||
when TOut is void:
|
||||
action(m.data)
|
||||
else:
|
||||
self.repy(action(m.data))
|
||||
|
||||
proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
|
||||
## creates an actor pool.
|
||||
newSeq(a.actors, poolSize)
|
||||
when TOut isnot void:
|
||||
open(a.outputs)
|
||||
for i in 0 .. < a.actors.len:
|
||||
a.actors[i] = spawn(poolWorker)
|
||||
|
||||
proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
|
||||
## waits for each actor in the actor pool `a` to finish and frees the
|
||||
## resources attached to `a`.
|
||||
var t: TTask[TIn, TOut]
|
||||
t.shutdown = true
|
||||
for i in 0 .. < a.actors.len: send(a.actors[i], t)
|
||||
for i in 0 .. < a.actors.len: join(a.actors[i])
|
||||
when TOut isnot void:
|
||||
close(a.outputs)
|
||||
a.actors = nil
|
||||
|
||||
template setupTask =
|
||||
var t: TTask[TIn, TOut]
|
||||
t.action = action
|
||||
shallowCopy(t.data, input)
|
||||
|
||||
template schedule =
|
||||
# extremely simple scheduler: We always try the first thread first, so that
|
||||
# it remains 'hot' ;-). Round-robin hurts for keeping threads hot.
|
||||
for i in 0..high(a.actors):
|
||||
if a.actors[i].i.ready:
|
||||
a.actors[i].send(t)
|
||||
return
|
||||
# no thread ready :-( --> send message to the thread which has the least
|
||||
# messages pending:
|
||||
var minIdx = 0
|
||||
var minVal = high(int)
|
||||
for i in 0..high(a.actors):
|
||||
var curr = a.actors[i].i.peek
|
||||
if curr == 0:
|
||||
# ok, is ready now:
|
||||
a.actors[i].send(t)
|
||||
return
|
||||
if curr < minVal:
|
||||
minVal = curr
|
||||
minIdx = i
|
||||
a.actors[minIdx].send(t)
|
||||
|
||||
proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut],
|
||||
action: proc (input: TIn): TOut {.thread.},
|
||||
input: TIn): ptr TChannel[TOut] =
|
||||
## uses the actor pool to run `action` concurrently. `spawn` is guaranteed
|
||||
## to not block.
|
||||
setupTask()
|
||||
result = addr(p.outputs)
|
||||
schedule()
|
||||
|
||||
proc spawn*[TIn](p: var TActorPool[TIn, void],
|
||||
action: proc (input: TIn) {.thread.},
|
||||
input: TIn) =
|
||||
## uses the actor pool to run `action` concurrently. `spawn` is guaranteed
|
||||
## to not block.
|
||||
setupTask()
|
||||
schedule()
|
||||
|
||||
when isMainModule:
|
||||
var
|
||||
a: TActorPool[int, void]
|
||||
createActorPool(a)
|
||||
for i in 0 .. < 300:
|
||||
a.spawn(proc (x: int) {.thread.} = echo x)
|
||||
|
||||
when false:
|
||||
proc treeDepth(n: PNode): int {.thread.} =
|
||||
var x = a.spawn(treeDepth, n.le)
|
||||
var y = a.spawn(treeDepth, n.ri)
|
||||
result = max(^x, ^y) + 1
|
||||
|
||||
a.join()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user