mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-28 17:04:41 +00:00
basic message passing working
This commit is contained in:
@@ -236,7 +236,7 @@ type
|
||||
|
||||
EInvalidObjectAssignment* =
|
||||
object of ESynch ## is raised if an object gets assigned to its
|
||||
## farther's object.
|
||||
## parent's object.
|
||||
|
||||
EInvalidObjectConversion* =
|
||||
object of ESynch ## is raised if an object is converted to an incompatible
|
||||
@@ -261,7 +261,10 @@ type
|
||||
## that cannot be represented with infinite
|
||||
## precision -- for example, 2.0 / 3.0, log(1.1)
|
||||
## NOTE: Nimrod currently does not detect these!
|
||||
|
||||
EDeadThread* =
|
||||
object of ESynch ## is raised if it is attempted to send a message to a
|
||||
## dead thread.
|
||||
|
||||
TResult* = enum Failure, Success
|
||||
|
||||
proc sizeof*[T](x: T): natural {.magic: "SizeOf", noSideEffect.}
|
||||
|
||||
@@ -22,6 +22,8 @@ type
|
||||
PInbox = ptr TInbox
|
||||
TLoadStoreMode = enum mStore, mLoad
|
||||
|
||||
const ThreadDeadMask = -2
|
||||
|
||||
proc initInbox(p: pointer) =
|
||||
var inbox = cast[PInbox](p)
|
||||
initSysLock(inbox.lock)
|
||||
@@ -30,6 +32,9 @@ proc initInbox(p: pointer) =
|
||||
|
||||
proc freeInbox(p: pointer) =
|
||||
var inbox = cast[PInbox](p)
|
||||
# we need to grab the lock to be save against sending threads!
|
||||
acquireSys(inbox.lock)
|
||||
inbox.mask = ThreadDeadMask
|
||||
deallocOsPages(inbox.region)
|
||||
deinitSys(inbox.lock)
|
||||
deinitSysCond(inbox.cond)
|
||||
@@ -158,7 +163,6 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) =
|
||||
q.mask = cap*2 - 1
|
||||
q.wr = q.count
|
||||
q.rd = 0
|
||||
#echo "came here"
|
||||
storeAux(addr(q.data[q.wr * typ.size]), data, typ, q, mStore)
|
||||
inc q.count
|
||||
q.wr = (q.wr + 1) and q.mask
|
||||
@@ -177,23 +181,34 @@ template lockInbox(q: expr, action: stmt) =
|
||||
proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) =
|
||||
## sends a message to a thread. `msg` is deeply copied.
|
||||
var q = cast[PInbox](getInBoxMem(receiver))
|
||||
if q.mask == ThreadDeadMask:
|
||||
raise newException(EDeadThread, "cannot send message; thread died")
|
||||
acquireSys(q.lock)
|
||||
var m: TMsg
|
||||
shallowCopy(m, msg)
|
||||
rawSend(q, addr(m), cast[PNimType](getTypeInfo(msg)))
|
||||
var typ = cast[PNimType](getTypeInfo(msg))
|
||||
rawSend(q, addr(m), typ)
|
||||
q.elemType = typ
|
||||
releaseSys(q.lock)
|
||||
SignalSysCond(q.cond)
|
||||
|
||||
proc recv*[TMsg](): TMsg =
|
||||
## receives a message from its internal message queue. This blocks until
|
||||
## a message has arrived! You may use ``peek`` to avoid the blocking.
|
||||
proc llRecv(res: pointer, typ: PNimType) =
|
||||
# to save space, the generic is as small as possible
|
||||
var q = cast[PInbox](getInBoxMem())
|
||||
acquireSys(q.lock)
|
||||
while q.count <= 0:
|
||||
WaitSysCond(q.cond, q.lock)
|
||||
rawRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))
|
||||
if typ != q.elemType:
|
||||
releaseSys(q.lock)
|
||||
raise newException(EInvalidValue, "cannot receive message of wrong type")
|
||||
rawRecv(q, res, typ)
|
||||
releaseSys(q.lock)
|
||||
|
||||
proc recv*[TMsg](): TMsg =
|
||||
## receives a message from its internal message queue. This blocks until
|
||||
## a message has arrived! You may use ``peek`` to avoid the blocking.
|
||||
llRecv(addr(result), cast[PNimType](getTypeInfo(result)))
|
||||
|
||||
proc peek*(): int =
|
||||
## returns the current number of messages in the inbox.
|
||||
var q = cast[PInbox](getInBoxMem())
|
||||
|
||||
@@ -249,7 +249,8 @@ when not defined(useNimRtl):
|
||||
|
||||
type
|
||||
TThread* {.pure, final.}[TParam] = object of TGcThread ## Nimrod thread.
|
||||
fn: proc (p: TParam)
|
||||
emptyFn: proc ()
|
||||
dataFn: proc (p: TParam)
|
||||
data: TParam
|
||||
|
||||
proc initInbox(p: pointer)
|
||||
@@ -268,14 +269,14 @@ template ThreadProcWrapperBody(closure: expr) =
|
||||
initGC()
|
||||
t.stackBottom = addr(t)
|
||||
registerThread(t)
|
||||
initInbox(addr(t.inbox))
|
||||
try:
|
||||
when false:
|
||||
var a = addr(tls)
|
||||
var b = MaskStackPointer(1293920-372736-303104-36864)
|
||||
c_fprintf(c_stdout, "TLS: %p\nmasked: %p\ndiff: %ld\n",
|
||||
a, b, cast[int](a) - cast[int](b))
|
||||
t.fn(t.data)
|
||||
if t.emptyFn == nil: t.dataFn(t.data)
|
||||
else: t.emptyFn()
|
||||
finally:
|
||||
# XXX shut-down is not executed when the thread is forced down!
|
||||
freeInbox(addr(t.inbox))
|
||||
@@ -326,8 +327,28 @@ proc createThread*[TParam](t: var TThread[TParam],
|
||||
## creates a new thread `t` and starts its execution. Entry point is the
|
||||
## proc `tp`. `param` is passed to `tp`.
|
||||
t.data = param
|
||||
t.fn = tp
|
||||
t.dataFn = tp
|
||||
t.stackSize = ThreadStackSize
|
||||
initInbox(addr(t.inbox))
|
||||
when hostOS == "windows":
|
||||
var dummyThreadId: int32
|
||||
t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam],
|
||||
addr(t), 0'i32, dummyThreadId)
|
||||
if t.sys <= 0:
|
||||
raise newException(EResourceExhausted, "cannot create thread")
|
||||
else:
|
||||
var a: Tpthread_attr
|
||||
pthread_attr_init(a)
|
||||
pthread_attr_setstacksize(a, ThreadStackSize)
|
||||
if pthread_create(t.sys, a, threadProcWrapper[TParam], addr(t)) != 0:
|
||||
raise newException(EResourceExhausted, "cannot create thread")
|
||||
|
||||
proc createThread*[TParam](t: var TThread[TParam], tp: proc () {.thread.}) =
|
||||
## creates a new thread `t` and starts its execution. Entry point is the
|
||||
## proc `tp`.
|
||||
t.emptyFn = tp
|
||||
t.stackSize = ThreadStackSize
|
||||
initInbox(addr(t.inbox))
|
||||
when hostOS == "windows":
|
||||
var dummyThreadId: int32
|
||||
t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam],
|
||||
@@ -342,9 +363,9 @@ proc createThread*[TParam](t: var TThread[TParam],
|
||||
raise newException(EResourceExhausted, "cannot create thread")
|
||||
|
||||
when useStackMaskHack:
|
||||
proc runMain(tp: proc (dummy: pointer) {.thread.}) {.compilerproc.} =
|
||||
proc runMain(tp: proc () {.thread.}) {.compilerproc.} =
|
||||
var mainThread: TThread[pointer]
|
||||
createThread(mainThread, tp, nil)
|
||||
createThread(mainThread, tp)
|
||||
joinThread(mainThread)
|
||||
|
||||
# --------------------------- lock handling ----------------------------------
|
||||
@@ -462,9 +483,9 @@ proc Release*(lock: var TLock) =
|
||||
|
||||
# ------------------------ message passing support ---------------------------
|
||||
|
||||
proc getInBoxMem*[TMsg](t: var TThread[TMsg]): pointer {.inline.} =
|
||||
proc getInBoxMem[TMsg](t: var TThread[TMsg]): pointer {.inline.} =
|
||||
result = addr(t.inbox)
|
||||
|
||||
proc getInBoxMem*(): pointer {.inline.} =
|
||||
proc getInBoxMem(): pointer {.inline.} =
|
||||
result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).inbox)
|
||||
|
||||
|
||||
2
todo.txt
2
todo.txt
@@ -1,12 +1,12 @@
|
||||
High priority (version 0.8.12)
|
||||
==============================
|
||||
* ``force_nosideEffect`` or some similar pragma is needed (``loophole``?)
|
||||
* test threads on windows
|
||||
* test thread analysis:
|
||||
var x = globalString # ok, copied; `x` is mine!
|
||||
vs
|
||||
var x = globalRef # read access, `x` is theirs!
|
||||
|
||||
* test message passing built-ins
|
||||
* make threadvar efficient again on linux after testing
|
||||
* document Nimrod's threads
|
||||
* document Nimrod's two phase symbol lookup for generics
|
||||
|
||||
Reference in New Issue
Block a user