mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-30 18:02:05 +00:00
235 lines
7.1 KiB
Nim
235 lines
7.1 KiB
Nim
#
|
|
#
|
|
# Nimrod's Runtime Library
|
|
# (c) Copyright 2012 Andreas Rumpf
|
|
#
|
|
# See the file "copying.txt", included in this
|
|
# distribution, for details about the copyright.
|
|
#
|
|
|
|
## `Actor`:idx: support for Nimrod. An actor is implemented as a thread with
|
|
## a channel as its inbox. This module requires the ``--threads:on``
|
|
## command line switch.
|
|
##
|
|
## Example:
|
|
##
|
|
## .. code-block:: nimrod
|
|
##
|
|
## var
|
|
## a: TActorPool[int, void]
|
|
## createActorPool(a)
|
|
## for i in 0 .. < 300:
|
|
## a.spawn(i, proc (x: int) {.thread.} = echo x)
|
|
## a.join()
|
|
|
|
from os import sleep
|
|
|
|
type
|
|
TTask*[TIn, TOut] = object{.pure, final.} ## a task
|
|
when TOut isnot void:
|
|
receiver*: ptr TChannel[TOut] ## the receiver channel of the response
|
|
action*: proc (x: TIn): TOut {.thread.} ## action to execute;
|
|
## sometimes useful
|
|
shutDown*: bool ## set to tell an actor to shut-down
|
|
data*: TIn ## the data to process
|
|
|
|
TActor[TIn, TOut] = object{.pure, final.}
|
|
i: TChannel[TTask[TIn, TOut]]
|
|
t: TThread[ptr TActor[TIn, TOut]]
|
|
|
|
PActor*[TIn, TOut] = ptr TActor[TIn, TOut] ## an actor
|
|
|
|
proc spawn*[TIn, TOut](action: proc(
|
|
self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
|
|
## creates an actor; that is a thread with an inbox. The caller MUST call
|
|
## ``join`` because that also frees the actor's associated resources.
|
|
result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[])))
|
|
open(result.i)
|
|
createThread(result.t, action, result)
|
|
|
|
proc inbox*[TIn, TOut](self: PActor[TIn, TOut]): ptr TChannel[TIn] =
|
|
## gets a pointer to the associated inbox of the actor `self`.
|
|
result = addr(self.i)
|
|
|
|
proc running*[TIn, TOut](a: PActor[TIn, TOut]): bool =
|
|
## returns true if the actor `a` is running.
|
|
result = running(a.t)
|
|
|
|
proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool =
|
|
## returns true if the actor `a` is ready to process new messages.
|
|
result = ready(a.i)
|
|
|
|
proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
|
|
## joins an actor.
|
|
joinThread(a.t)
|
|
close(a.i)
|
|
deallocShared(a)
|
|
|
|
proc recv*[TIn, TOut](a: PActor[TIn, TOut]): TTask[TIn, TOut] =
|
|
## receives a task from `a`'s inbox.
|
|
result = recv(a.i)
|
|
|
|
proc send*[TIn, TOut, X, Y](receiver: PActor[TIn, TOut], msg: TIn,
|
|
sender: PActor[X, Y]) =
|
|
## sends a message to `a`'s inbox.
|
|
var t: TTask[TIn, TOut]
|
|
t.receiver = addr(sender.i)
|
|
shallowCopy(t.data, msg)
|
|
send(receiver.i, t)
|
|
|
|
proc send*[TIn, TOut](receiver: PActor[TIn, TOut], msg: TIn,
|
|
sender: ptr TChannel[TOut] = nil) =
|
|
## sends a message to `receiver`'s inbox.
|
|
var t: TTask[TIn, TOut]
|
|
t.receiver = sender
|
|
shallowCopy(t.data, msg)
|
|
send(receiver.i, t)
|
|
|
|
proc sendShutdown*[TIn, TOut](receiver: PActor[TIn, TOut]) =
|
|
## send a shutdown message to `receiver`.
|
|
var t: TTask[TIn, TOut]
|
|
t.shutdown = true
|
|
send(receiver.i, t)
|
|
|
|
proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) =
|
|
## sends a message to io's output message box.
|
|
when TOut is void:
|
|
{.error: "you cannot reply to a void outbox".}
|
|
assert t.receiver != nil
|
|
send(t.receiver[], m)
|
|
|
|
|
|
# ----------------- actor pools ----------------------------------------------
|
|
|
|
type
|
|
TActorPool*[TIn, TOut] = object{.pure, final.} ## an actor pool
|
|
actors: seq[PActor[TIn, TOut]]
|
|
when TOut isnot void:
|
|
outputs: TChannel[TOut]
|
|
|
|
proc `^`*[T](f: ptr TChannel[T]): T =
|
|
## alias for 'recv'.
|
|
result = recv(f[])
|
|
|
|
proc poolWorker[TIn, TOut](self: PActor[TIn, TOut]) {.thread.} =
|
|
while true:
|
|
var m = self.recv
|
|
if m.shutDown: break
|
|
when TOut is void:
|
|
m.action(m.data)
|
|
else:
|
|
send(m.receiver[], m.action(m.data))
|
|
#self.reply()
|
|
|
|
proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
|
|
## creates an actor pool.
|
|
newSeq(a.actors, poolSize)
|
|
when TOut isnot void:
|
|
open(a.outputs)
|
|
for i in 0 .. < a.actors.len:
|
|
a.actors[i] = spawn(poolWorker[TIn, TOut])
|
|
|
|
proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
|
|
## waits for every actor of `a` to finish with its work. Currently this is
|
|
## implemented as polling every `polling` ms and has a slight chance
|
|
## of failing since we check for every actor to be in `ready` state and not
|
|
## for messages still in ether. This will change in a later
|
|
## version, however.
|
|
var allReadyCount = 0
|
|
while true:
|
|
var wait = false
|
|
for i in 0..high(a.actors):
|
|
if not a.actors[i].i.ready:
|
|
wait = true
|
|
allReadyCount = 0
|
|
break
|
|
if not wait:
|
|
# it's possible that some actor sent a message to some other actor but
|
|
# both appeared to be non-working as the message takes some time to
|
|
# arrive. We assume that this won't take longer than `polling` and
|
|
# simply attempt a second time and declare victory then. ;-)
|
|
inc allReadyCount
|
|
if allReadyCount > 1: break
|
|
sleep(polling)
|
|
|
|
proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
|
|
## terminates each actor in the actor pool `a` and frees the
|
|
## resources attached to `a`.
|
|
var t: TTask[TIn, TOut]
|
|
t.shutdown = true
|
|
for i in 0.. <a.actors.len: send(a.actors[i].i, t)
|
|
for i in 0.. <a.actors.len: join(a.actors[i])
|
|
when TOut isnot void:
|
|
close(a.outputs)
|
|
a.actors = nil
|
|
|
|
proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
|
|
## short-cut for `sync` and then `terminate`.
|
|
sync(a)
|
|
terminate(a)
|
|
|
|
template setupTask =
|
|
t.action = action
|
|
shallowCopy(t.data, input)
|
|
|
|
template schedule =
|
|
# extremely simple scheduler: We always try the first thread first, so that
|
|
# it remains 'hot' ;-). Round-robin hurts for keeping threads hot.
|
|
for i in 0..high(p.actors):
|
|
if p.actors[i].i.ready:
|
|
p.actors[i].i.send(t)
|
|
return
|
|
# no thread ready :-( --> send message to the thread which has the least
|
|
# messages pending:
|
|
var minIdx = -1
|
|
var minVal = high(int)
|
|
for i in 0..high(p.actors):
|
|
var curr = p.actors[i].i.peek
|
|
if curr == 0:
|
|
# ok, is ready now:
|
|
p.actors[i].i.send(t)
|
|
return
|
|
if curr < minVal and curr >= 0:
|
|
minVal = curr
|
|
minIdx = i
|
|
if minIdx >= 0:
|
|
p.actors[minIdx].i.send(t)
|
|
else:
|
|
raise newException(EDeadThread, "cannot send message; thread died")
|
|
|
|
proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn,
|
|
action: proc (input: TIn): TOut {.thread.}
|
|
): ptr TChannel[TOut] =
|
|
## uses the actor pool to run ``action(input)`` concurrently.
|
|
## `spawn` is guaranteed to not block.
|
|
var t: TTask[TIn, TOut]
|
|
setupTask()
|
|
result = addr(p.outputs)
|
|
t.receiver = result
|
|
schedule()
|
|
|
|
proc spawn*[TIn](p: var TActorPool[TIn, void], input: TIn,
|
|
action: proc (input: TIn) {.thread.}) =
|
|
## uses the actor pool to run ``action(input)`` concurrently.
|
|
## `spawn` is guaranteed to not block.
|
|
var t: TTask[TIn, void]
|
|
setupTask()
|
|
schedule()
|
|
|
|
when isMainModule:
|
|
var
|
|
a: TActorPool[int, void]
|
|
createActorPool(a)
|
|
for i in 0 .. < 300:
|
|
a.spawn(i, proc (x: int) {.thread.} = echo x)
|
|
|
|
when false:
|
|
proc treeDepth(n: PNode): int {.thread.} =
|
|
var x = a.spawn(treeDepth, n.le)
|
|
var y = a.spawn(treeDepth, n.ri)
|
|
result = max(^x, ^y) + 1
|
|
|
|
a.join()
|
|
|
|
|