mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-31 10:22:15 +00:00
thread support: next iteration
This commit is contained in:
@@ -32,7 +32,10 @@
|
||||
## for i in 0..high(thr):
|
||||
## joinThread(thr[i])
|
||||
|
||||
when not defined(boehmgc) and not defined(nogc):
|
||||
when not compileOption("threads"):
|
||||
{.error: "Thread support requires ``--threads:on`` commandline switch".}
|
||||
|
||||
when not defined(boehmgc) and not defined(nogc) and false:
|
||||
{.error: "Thread support requires --gc:boehm or --gc:none".}
|
||||
|
||||
# We jump through some hops here to ensure that Nimrod thread procs can have
|
||||
@@ -46,39 +49,13 @@ type
|
||||
TThreadProcClosure {.pure, final.}[TParam] = object
|
||||
fn: proc (p: TParam)
|
||||
data: TParam
|
||||
threadLocalStorage: pointer
|
||||
|
||||
when defined(Windows):
|
||||
type
|
||||
when defined(windows):
|
||||
type
|
||||
THandle = int
|
||||
TSysThread = THandle
|
||||
TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi
|
||||
DebugInfo: pointer
|
||||
LockCount: int32
|
||||
RecursionCount: int32
|
||||
OwningThread: int
|
||||
LockSemaphore: int
|
||||
Reserved: int32
|
||||
|
||||
TWinThreadProc = proc (x: pointer): int32 {.stdcall.}
|
||||
|
||||
proc InitSysLock(L: var TSysLock) {.stdcall,
|
||||
dynlib: "kernel32", importc: "InitializeCriticalSection".}
|
||||
## Initializes the lock `L`.
|
||||
|
||||
proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall,
|
||||
dynlib: "kernel32", importc: "TryEnterCriticalSection".}
|
||||
## Tries to aquire the lock `L`.
|
||||
|
||||
proc TryAquireSys(L: var TSysLock): bool {.inline.} =
|
||||
result = TryAquireSysAux(L) != 0'i32
|
||||
|
||||
proc AquireSys(L: var TSysLock) {.stdcall,
|
||||
dynlib: "kernel32", importc: "EnterCriticalSection".}
|
||||
## Aquires the lock `L`.
|
||||
|
||||
proc ReleaseSys(L: var TSysLock) {.stdcall,
|
||||
dynlib: "kernel32", importc: "LeaveCriticalSection".}
|
||||
## Releases the lock `L`.
|
||||
|
||||
proc CreateThread(lpThreadAttributes: Pointer, dwStackSize: int32,
|
||||
lpStartAddress: TWinThreadProc,
|
||||
@@ -105,29 +82,17 @@ when defined(Windows):
|
||||
proc TerminateThread(hThread: THandle, dwExitCode: int32): int32 {.
|
||||
stdcall, dynlib: "kernel32", importc: "TerminateThread".}
|
||||
|
||||
{.push stack_trace:off.}
|
||||
proc threadProcWrapper[TParam](closure: pointer): int32 {.stdcall.} =
|
||||
var c = cast[ptr TThreadProcClosure[TParam]](closure)
|
||||
SetThreadLocalStorage(c.threadLocalStorage)
|
||||
c.fn(c.data)
|
||||
# implicitely return 0
|
||||
{.pop.}
|
||||
|
||||
else:
|
||||
type
|
||||
TSysLock {.importc: "pthread_mutex_t", header: "<sys/types.h>".} = int
|
||||
TSysThread {.importc: "pthread_t", header: "<sys/types.h>".} = int
|
||||
|
||||
proc InitSysLock(L: var TSysLock, attr: pointer = nil) {.
|
||||
importc: "pthread_mutex_init", header: "<pthread.h>".}
|
||||
|
||||
proc AquireSys(L: var TSysLock) {.
|
||||
importc: "pthread_mutex_lock", header: "<pthread.h>".}
|
||||
proc TryAquireSysAux(L: var TSysLock): cint {.
|
||||
importc: "pthread_mutex_trylock", header: "<pthread.h>".}
|
||||
|
||||
proc TryAquireSys(L: var TSysLock): bool {.inline.} =
|
||||
result = TryAquireSysAux(L) == 0'i32
|
||||
|
||||
proc ReleaseSys(L: var TSysLock) {.
|
||||
importc: "pthread_mutex_unlock", header: "<pthread.h>".}
|
||||
|
||||
proc pthread_create(a1: var TSysThread, a2: ptr int,
|
||||
a3: proc (x: pointer) {.noconv.},
|
||||
@@ -139,14 +104,19 @@ else:
|
||||
proc pthread_cancel(a1: TSysThread): cint {.
|
||||
importc: "pthread_cancel", header: "<pthread.h>".}
|
||||
|
||||
{.push stack_trace:off.}
|
||||
proc threadProcWrapper[TParam](closure: pointer) {.noconv.} =
|
||||
var c = cast[ptr TThreadProcClosure[TParam]](closure)
|
||||
SetThreadLocalStorage(c.threadLocalStorage)
|
||||
c.fn(c.data)
|
||||
{.pop.}
|
||||
|
||||
|
||||
const
|
||||
noDeadlocks = true # compileOption("deadlockPrevention")
|
||||
noDeadlocks = false # compileOption("deadlockPrevention")
|
||||
|
||||
include "lib/system/systhread"
|
||||
|
||||
when noDeadLocks:
|
||||
type
|
||||
TLock* {.pure, final.} = object ## Standard Nimrod Lock type.
|
||||
@@ -281,62 +251,81 @@ proc createThread*[TParam](t: var TThread[TParam],
|
||||
param: TParam) =
|
||||
## creates a new thread `t` and starts its execution. Entry point is the
|
||||
## proc `tp`. `param` is passed to `tp`.
|
||||
t.globals = AllocThreadLocalStorage()
|
||||
t.c.data = param
|
||||
t.c.fn = tp
|
||||
t.globals = CreateThreadLocalStorage()
|
||||
when hostOS == "windows":
|
||||
var dummyThreadId: int32
|
||||
t.sys = CreateThread(nil, 0'i32, threadProcWrapper[TParam],
|
||||
addr(t.c), 0'i32, dummyThreadId)
|
||||
else:
|
||||
discard pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c))
|
||||
if pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c)) != 0:
|
||||
raise newException(EIO, "cannot create thread")
|
||||
|
||||
when isMainModule:
|
||||
import os
|
||||
|
||||
var
|
||||
thr: array [0..4, TThread[tuple[a,b: int]]]
|
||||
thr: array [0..1, TThread[tuple[a,b: int]]]
|
||||
L, M, N: TLock
|
||||
|
||||
proc doNothing() = nil
|
||||
|
||||
{.push stack_trace:off.}
|
||||
proc threadFunc(interval: tuple[a,b: int]) {.procvar.} =
|
||||
for i in interval.a..interval.b:
|
||||
case i mod 6
|
||||
of 0:
|
||||
Aquire(L) # lock stdout
|
||||
Aquire(M)
|
||||
Aquire(N)
|
||||
of 1:
|
||||
Aquire(L)
|
||||
Aquire(N) # lock stdout
|
||||
Aquire(M)
|
||||
of 2:
|
||||
Aquire(M)
|
||||
Aquire(L)
|
||||
Aquire(N)
|
||||
of 3:
|
||||
Aquire(M)
|
||||
Aquire(N)
|
||||
Aquire(L)
|
||||
of 4:
|
||||
Aquire(N)
|
||||
Aquire(M)
|
||||
Aquire(L)
|
||||
of 5:
|
||||
Aquire(N)
|
||||
Aquire(L)
|
||||
Aquire(M)
|
||||
else: assert false
|
||||
echo i
|
||||
echo "deadlocks prevented: ", deadlocksPrevented
|
||||
Release(L)
|
||||
Release(M)
|
||||
Release(N)
|
||||
doNothing()
|
||||
when false:
|
||||
for i in interval.a..interval.b:
|
||||
when nodeadlocks:
|
||||
case i mod 6
|
||||
of 0:
|
||||
Aquire(L) # lock stdout
|
||||
Aquire(M)
|
||||
Aquire(N)
|
||||
of 1:
|
||||
Aquire(L)
|
||||
Aquire(N) # lock stdout
|
||||
Aquire(M)
|
||||
of 2:
|
||||
Aquire(M)
|
||||
Aquire(L)
|
||||
Aquire(N)
|
||||
of 3:
|
||||
Aquire(M)
|
||||
Aquire(N)
|
||||
Aquire(L)
|
||||
of 4:
|
||||
Aquire(N)
|
||||
Aquire(M)
|
||||
Aquire(L)
|
||||
of 5:
|
||||
Aquire(N)
|
||||
Aquire(L)
|
||||
Aquire(M)
|
||||
else: assert false
|
||||
else:
|
||||
Aquire(L) # lock stdout
|
||||
Aquire(M)
|
||||
Aquire(N)
|
||||
|
||||
#echo i
|
||||
os.sleep(10)
|
||||
stdout.write(i)
|
||||
when nodeadlocks:
|
||||
echo "deadlocks prevented: ", deadlocksPrevented
|
||||
Release(L)
|
||||
Release(M)
|
||||
Release(N)
|
||||
{.pop.}
|
||||
#InitLock(L)
|
||||
#InitLock(M)
|
||||
#InitLock(N)
|
||||
|
||||
InitLock(L)
|
||||
InitLock(M)
|
||||
InitLock(N)
|
||||
|
||||
for i in 0..high(thr):
|
||||
createThread(thr[i], threadFunc, (i*100, i*100+50))
|
||||
for i in 0..high(thr):
|
||||
joinThread(thr[i])
|
||||
proc main =
|
||||
for i in 0..high(thr):
|
||||
createThread(thr[i], threadFunc, (i*100, i*100+50))
|
||||
for i in 0..high(thr):
|
||||
joinThread(thr[i])
|
||||
|
||||
main()
|
||||
|
||||
|
||||
@@ -1369,7 +1369,7 @@ template accumulateResult*(iter: expr) =
|
||||
# we have to compute this here before turning it off in except.nim anyway ...
|
||||
const nimrodStackTrace = compileOption("stacktrace")
|
||||
|
||||
{.push checks: off, line_dir: off, debugger: off.}
|
||||
{.push checks: off, debugger: off.}
|
||||
# obviously we cannot generate checking operations here :-)
|
||||
# because it would yield into an endless recursion
|
||||
# however, stack-traces are available for most parts
|
||||
@@ -1665,6 +1665,7 @@ when not defined(EcmaScript) and not defined(NimrodVM):
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
include "system/systhread"
|
||||
include "system/excpt"
|
||||
# we cannot compile this with stack tracing on
|
||||
# as it would recurse endlessly!
|
||||
@@ -1718,12 +1719,11 @@ when not defined(EcmaScript) and not defined(NimrodVM):
|
||||
else:
|
||||
result = n.sons[n.len]
|
||||
|
||||
include "system/systhread"
|
||||
{.push stack_trace: off.}
|
||||
include "system/mmdisp"
|
||||
include "system/sysstr"
|
||||
{.pop.}
|
||||
|
||||
include "system/sysstr"
|
||||
include "system/assign"
|
||||
include "system/repr"
|
||||
|
||||
|
||||
@@ -81,8 +81,12 @@ when hasThreadSupport:
|
||||
proc pthread_setspecific(a1: Tpthread_key, a2: pointer): int32 {.
|
||||
importc: "pthread_setspecific", header: "<pthread.h>".}
|
||||
|
||||
proc specificDestroy(mem: pointer) {.noconv.} = dealloc(mem)
|
||||
|
||||
proc specificDestroy(mem: pointer) {.noconv.} =
|
||||
#aquireSys(heapLock)
|
||||
#dealloc(mem)
|
||||
#releaseSys(heapLock)
|
||||
#c_free(mem)
|
||||
|
||||
proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} =
|
||||
discard pthread_key_create(addr(result), specificDestroy)
|
||||
proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {.
|
||||
@@ -104,31 +108,50 @@ when hasThreadSupport:
|
||||
tempFrames: array [0..127, PFrame] # cannot be allocated on the stack!
|
||||
data: float # compiler should add thread local variables here!
|
||||
PGlobals = ptr TGlobals
|
||||
|
||||
# it's more efficient to not use a global variable for the thread storage
|
||||
# slot, but to rely on the implementation to assign slot 0 for us... ;-)
|
||||
var checkSlot = ThreadVarAlloc()
|
||||
const globalsSlot = TThreadVarSlot(0)
|
||||
assert checkSlot.int == globalsSlot.int
|
||||
|
||||
var globalsSlot = ThreadVarAlloc()
|
||||
proc CreateThreadLocalStorage*(): pointer {.inl.} =
|
||||
proc AtomicAlloc0(size: int): pointer =
|
||||
#AquireSys(heapLock)
|
||||
result = c_malloc(size)
|
||||
zeroMem(result, size)
|
||||
#ReleaseSys(heapLock)
|
||||
|
||||
proc NewGlobals(): PGlobals =
|
||||
result = cast[PGlobals](AtomicAlloc0(sizeof(TGlobals)))
|
||||
new(result.gAssertionFailed)
|
||||
result.buf = newStringOfCap(2000)
|
||||
result.assertBuf = newStringOfCap(2000)
|
||||
|
||||
proc AllocThreadLocalStorage*(): pointer {.inl.} =
|
||||
isMultiThreaded = true
|
||||
result = alloc0(sizeof(TGlobals))
|
||||
ThreadVarSetValue(globalsSlot, result)
|
||||
result = NewGlobals()
|
||||
|
||||
proc SetThreadLocalStorage*(p: pointer) {.inl.} =
|
||||
ThreadVarSetValue(globalsSlot, p)
|
||||
|
||||
proc GetGlobals(): PGlobals {.compilerRtl, inl.} =
|
||||
result = cast[PGlobals](ThreadVarGetValue(globalsSlot))
|
||||
|
||||
# create for the main thread:
|
||||
ThreadVarSetValue(globalsSlot, alloc0(sizeof(TGlobals)))
|
||||
ThreadVarSetValue(globalsSlot, NewGlobals())
|
||||
|
||||
when hasThreadSupport:
|
||||
template ThreadGlobals =
|
||||
template ThreadGlobals =
|
||||
var globals = GetGlobals()
|
||||
template `||`(varname: expr): expr = globals.varname
|
||||
|
||||
ThreadGlobals()
|
||||
#ThreadGlobals()
|
||||
else:
|
||||
template ThreadGlobals = nil # nothing
|
||||
template `||`(varname: expr): expr = varname
|
||||
|
||||
var
|
||||
framePtr {.compilerproc.}: PFrame # XXX only temporarily a compilerproc
|
||||
framePtr: PFrame
|
||||
excHandler: PSafePoint = nil
|
||||
# list of exception handlers
|
||||
# a global variable for the root of all try blocks
|
||||
@@ -141,6 +164,11 @@ else:
|
||||
tempFrames: array [0..127, PFrame] # cannot be allocated on the stack!
|
||||
gAssertionFailed: ref EAssertionFailed
|
||||
|
||||
new(||gAssertionFailed)
|
||||
||buf = newStringOfCap(2000)
|
||||
||assertBuf = newStringOfCap(2000)
|
||||
|
||||
|
||||
proc pushFrame(s: PFrame) {.compilerRtl, inl.} =
|
||||
ThreadGlobals()
|
||||
s.prev = ||framePtr
|
||||
@@ -388,11 +416,6 @@ proc registerSignalHandler() =
|
||||
|
||||
when not defined(noSignalHandler):
|
||||
registerSignalHandler() # call it in initialization section
|
||||
# for easier debugging of the GC, this memory is only allocated after the
|
||||
# signal handlers have been registered
|
||||
new(||gAssertionFailed)
|
||||
||buf = newStringOfCap(2000)
|
||||
||assertBuf = newStringOfCap(2000)
|
||||
|
||||
proc raiseRangeError(val: biggestInt) {.compilerproc, noreturn, noinline.} =
|
||||
raise newException(EOutOfRange, "value " & $val & " out of range")
|
||||
|
||||
@@ -81,14 +81,14 @@ var
|
||||
proc aquire(gch: var TGcHeap) {.inline.} =
|
||||
when hasThreadSupport:
|
||||
if isMultiThreaded:
|
||||
aquire(gch.zctLock)
|
||||
aquire(gch.cycleRootsLock)
|
||||
aquireSys(gch.zctLock)
|
||||
aquireSys(gch.cycleRootsLock)
|
||||
|
||||
proc release(gch: var TGcHeap) {.inline.} =
|
||||
when hasThreadSupport:
|
||||
if isMultiThreaded:
|
||||
release(gch.zctLock)
|
||||
release(gch.cycleRootsLock)
|
||||
releaseSys(gch.zctLock)
|
||||
releaseSys(gch.cycleRootsLock)
|
||||
|
||||
proc addZCT(s: var TCellSeq, c: PCell) {.noinline.} =
|
||||
if (c.refcount and rcZct) == 0:
|
||||
@@ -207,18 +207,18 @@ proc prepareDealloc(cell: PCell) =
|
||||
proc rtlAddCycleRoot(c: PCell) {.rtl, inl.} =
|
||||
# we MUST access gch as a global here, because this crosses DLL boundaries!
|
||||
when hasThreadSupport:
|
||||
if isMultiThreaded: Aquire(gch.cycleRootsLock)
|
||||
if isMultiThreaded: AquireSys(gch.cycleRootsLock)
|
||||
incl(gch.cycleRoots, c)
|
||||
when hasThreadSupport:
|
||||
if isMultiThreaded: Release(gch.cycleRootsLock)
|
||||
if isMultiThreaded: ReleaseSys(gch.cycleRootsLock)
|
||||
|
||||
proc rtlAddZCT(c: PCell) {.rtl, inl.} =
|
||||
# we MUST access gch as a global here, because this crosses DLL boundaries!
|
||||
when hasThreadSupport:
|
||||
if isMultiThreaded: Aquire(gch.zctLock)
|
||||
if isMultiThreaded: AquireSys(gch.zctLock)
|
||||
addZCT(gch.zct, c)
|
||||
when hasThreadSupport:
|
||||
if isMultiThreaded: Release(gch.zctLock)
|
||||
if isMultiThreaded: ReleaseSys(gch.zctLock)
|
||||
|
||||
proc decRef(c: PCell) {.inline.} =
|
||||
when stressGC:
|
||||
@@ -287,9 +287,10 @@ proc initGC() =
|
||||
Init(gch.cycleRoots)
|
||||
Init(gch.decStack)
|
||||
when hasThreadSupport:
|
||||
InitLock(gch.cycleRootsLock)
|
||||
InitLock(gch.zctLock)
|
||||
InitSysLock(gch.cycleRootsLock)
|
||||
InitSysLock(gch.zctLock)
|
||||
new(gOutOfMem) # reserve space for the EOutOfMemory exception here!
|
||||
|
||||
|
||||
proc forAllSlotsAux(dest: pointer, n: ptr TNimNode, op: TWalkOp) =
|
||||
var d = cast[TAddress](dest)
|
||||
|
||||
@@ -9,6 +9,12 @@
|
||||
|
||||
const
|
||||
maxThreads = 256
|
||||
SystemInclude = defined(hasThreadSupport)
|
||||
|
||||
when not SystemInclude:
|
||||
# ugly hack: this file is then included from core/threads, so we have
|
||||
# thread support:
|
||||
const hasThreadSupport = true
|
||||
|
||||
when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
|
||||
proc sync_add_and_fetch(p: var int, val: int): int {.
|
||||
@@ -40,3 +46,54 @@ proc atomicDec(memLoc: var int, x: int): int =
|
||||
dec(memLoc, x)
|
||||
result = memLoc
|
||||
|
||||
when defined(Windows):
|
||||
type
|
||||
TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi
|
||||
DebugInfo: pointer
|
||||
LockCount: int32
|
||||
RecursionCount: int32
|
||||
OwningThread: int
|
||||
LockSemaphore: int
|
||||
Reserved: int32
|
||||
|
||||
proc InitSysLock(L: var TSysLock) {.stdcall,
|
||||
dynlib: "kernel32", importc: "InitializeCriticalSection".}
|
||||
## Initializes the lock `L`.
|
||||
|
||||
proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall,
|
||||
dynlib: "kernel32", importc: "TryEnterCriticalSection".}
|
||||
## Tries to aquire the lock `L`.
|
||||
|
||||
proc TryAquireSys(L: var TSysLock): bool {.inline.} =
|
||||
result = TryAquireSysAux(L) != 0'i32
|
||||
|
||||
proc AquireSys(L: var TSysLock) {.stdcall,
|
||||
dynlib: "kernel32", importc: "EnterCriticalSection".}
|
||||
## Aquires the lock `L`.
|
||||
|
||||
proc ReleaseSys(L: var TSysLock) {.stdcall,
|
||||
dynlib: "kernel32", importc: "LeaveCriticalSection".}
|
||||
## Releases the lock `L`.
|
||||
|
||||
else:
|
||||
type
|
||||
TSysLock {.importc: "pthread_mutex_t", header: "<sys/types.h>".} = int
|
||||
|
||||
proc InitSysLock(L: var TSysLock, attr: pointer = nil) {.
|
||||
importc: "pthread_mutex_init", header: "<pthread.h>".}
|
||||
|
||||
proc AquireSys(L: var TSysLock) {.
|
||||
importc: "pthread_mutex_lock", header: "<pthread.h>".}
|
||||
proc TryAquireSysAux(L: var TSysLock): cint {.
|
||||
importc: "pthread_mutex_trylock", header: "<pthread.h>".}
|
||||
|
||||
proc TryAquireSys(L: var TSysLock): bool {.inline.} =
|
||||
result = TryAquireSysAux(L) == 0'i32
|
||||
|
||||
proc ReleaseSys(L: var TSysLock) {.
|
||||
importc: "pthread_mutex_unlock", header: "<pthread.h>".}
|
||||
|
||||
when SystemInclude:
|
||||
var heapLock: TSysLock
|
||||
InitSysLock(HeapLock)
|
||||
|
||||
|
||||
@@ -11,4 +11,8 @@ var val = {a, b}
|
||||
stdout.write(repr(val))
|
||||
stdout.writeln(repr({'a'..'z', 'A'..'Z'}))
|
||||
|
||||
#var testseq: seq[string] = @[
|
||||
# "a", "b", "c", "d", "e"
|
||||
#]
|
||||
#echo(repr(testseq))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user