further progress for multi-threading

This commit is contained in:
Araq
2011-05-19 23:23:10 +02:00
parent 9b460a71ce
commit c70fa87471
10 changed files with 207 additions and 181 deletions

View File

@@ -8,8 +8,9 @@
#
## Basic thread support for Nimrod. Note that Nimrod's default GC is still
## single-threaded. This means that either your threads should not allocate
## GC'ed memory, or you should compile with ``--gc:none`` or ``--gc:boehm``.
## single-threaded. This means that you MUST turn off the GC while multiple
## threads are executing that allocate GC'ed memory. The alternative is to
## compile with ``--gc:none`` or ``--gc:boehm``.
##
## Example:
##
@@ -27,16 +28,20 @@
##
## InitLock(L)
##
## GC_disable() # native GC does not support multiple thready yet :-(
## for i in 0..high(thr):
## createThread(thr[i], threadFunc, (i*10, i*10+5))
## for i in 0..high(thr):
## joinThread(thr[i])
## GC_enable()
when not compileOption("threads"):
{.error: "Thread support requires ``--threads:on`` commandline switch".}
when not defined(boehmgc) and not defined(nogc) and false:
{.error: "Thread support requires --gc:boehm or --gc:none".}
include "lib/system/systhread"
# We jump through some hops here to ensure that Nimrod thread procs can have
# the Nimrod calling convention. This is needed because thread procs are
@@ -93,6 +98,10 @@ when defined(windows):
else:
type
TSysThread {.importc: "pthread_t", header: "<sys/types.h>".} = int
Ttimespec {.importc: "struct timespec",
header: "<time.h>", final, pure.} = object
tv_sec: int
tv_nsec: int
proc pthread_create(a1: var TSysThread, a2: ptr int,
a3: proc (x: pointer) {.noconv.},
@@ -104,6 +113,17 @@ else:
proc pthread_cancel(a1: TSysThread): cint {.
importc: "pthread_cancel", header: "<pthread.h>".}
proc AquireSysTimeoutAux(L: var TSysLock, timeout: var Ttimespec): cint {.
importc: "pthread_mutex_timedlock", header: "<time.h>".}
proc AquireSysTimeout(L: var TSysLock, msTimeout: int) {.inline.} =
var a: Ttimespec
a.tv_sec = msTimeout div 1000
a.tv_nsec = (msTimeout mod 1000) * 1000
var res = AquireSysTimeoutAux(L, a)
if res != 0'i32:
raise newException(EResourceExhausted, $strerror(res))
{.push stack_trace:off.}
proc threadProcWrapper[TParam](closure: pointer) {.noconv.} =
var c = cast[ptr TThreadProcClosure[TParam]](closure)
@@ -114,121 +134,119 @@ else:
const
noDeadlocks = false # compileOption("deadlockPrevention")
include "lib/system/systhread"
when noDeadLocks:
type
TLock* {.pure, final.} = object ## Standard Nimrod Lock type.
key: int # used for identity and global order!
sys: TSysLock
next: ptr TLock
else:
type
TLock* = TSysLock
type
TLock* = TSysLock
TThread* {.pure, final.}[TParam] = object ## Nimrod thread.
sys: TSysThread
c: TThreadProcClosure[TParam]
when nodeadlocks:
var
lockList {.threadvar.}: ptr TLock
var
deadlocksPrevented* = 0 ## counts the number of times a
## deadlock has been prevented
proc InitLock*(L: var TLock) {.inline.} =
## Initializes the lock `L`.
when noDeadlocks:
InitSysLock(L.sys)
L.key = cast[int](addr(L))
else:
InitSysLock(L)
proc InitLock*(lock: var TLock) {.inline.} =
## Initializes the lock `lock`.
InitSysLock(lock)
proc TryAquire*(L: var TLock): bool {.inline.} =
## Try to aquires the lock `L`. Returns `true` on success.
when noDeadlocks:
result = TryAquireSys(L.sys)
else:
result = TryAquireSys(L)
proc OrderedLocks(g: PGlobals): bool =
for i in 0 .. g.locksLen-2:
if g.locks[i] >= g.locks[i+1]: return false
result = true
proc Aquire*(L: var TLock) =
## Aquires the lock `L`.
when nodeadlocks:
# Note: we MUST NOT change the linked list of locks before we have aquired
# the proper locks! This is because the pointer to the next lock is part
# of the lock itself!
assert L.key != 0
var p = lockList
if p == nil:
# simple case: no lock aquired yet:
AquireSys(L.sys)
locklist = addr(L)
L.next = nil
else:
# check where to put L into the list:
var r = p
var last: ptr TLock = nil
while L.key < r.key:
if r.next == nil:
# best case: L needs to be aquired as last lock, so we can
# skip a good amount of work:
AquireSys(L.sys)
r.next = addr(L)
L.next = nil
return
last = r
r = r.next
# special case: thread already holds L!
if L.key == r.key: return
# bad case: L needs to be somewhere in between
# release all locks after L:
var rollback = r
while r != nil:
ReleaseSys(r.sys)
r = r.next
# and aquire them in the correct order again:
AquireSys(L.sys)
r = rollback
while r != nil:
assert r.key < L.key
AquireSys(r.sys)
r = r.next
# now that we have all the locks we need, we can insert L
# into our list:
if last != nil:
L.next = last.next
last.next = addr(L)
proc TryAquire*(lock: var TLock): bool {.inline.} =
## Try to aquires the lock `lock`. Returns `true` on success.
when noDeadlocks:
result = TryAquireSys(lock)
if not result: return
# we have to add it to the ordered list. Oh, and we might fail if there#
# there is no space in the array left ...
var g = GetGlobals()
if g.locksLen >= len(g.locks):
ReleaseSys(lock)
raise newException(EResourceExhausted, "cannot aquire additional lock")
# find the position to add:
var p = addr(lock)
var L = g.locksLen-1
var i = 0
while i <= L:
assert g.locks[i] != nil
if g.locks[i] < p: inc(i) # in correct order
elif g.locks[i] == p: return # thread already holds lock
else:
L.next = lockList
lockList = addr(L)
inc(deadlocksPrevented)
# do the crazy stuff here:
while L >= i:
g.locks[L+1] = g.locks[L]
dec L
g.locks[i] = p
inc(g.locksLen)
assert OrderedLocks(g)
return
# simply add to the end:
g.locks[g.locksLen] = p
inc(g.locksLen)
assert OrderedLocks(g)
else:
AquireSys(L)
proc Release*(L: var TLock) =
## Releases the lock `L`.
result = TryAquireSys(lock)
proc Aquire*(lock: var TLock) =
## Aquires the lock `lock`.
when nodeadlocks:
assert L.key != 0
var p = lockList
var last: ptr TLock = nil
while true:
# if we don't find the lock, die by reading from nil!
if p.key == L.key:
if last != nil:
last.next = p.next
else:
assert p == lockList
lockList = locklist.next
L.next = nil
break
last = p
p = p.next
ReleaseSys(L.sys)
var g = GetGlobals()
var p = addr(lock)
var L = g.locksLen-1
var i = 0
while i <= L:
assert g.locks[i] != nil
if g.locks[i] < p: inc(i) # in correct order
elif g.locks[i] == p: return # thread already holds lock
else:
# do the crazy stuff here:
if g.locksLen >= len(g.locks):
raise newException(EResourceExhausted, "cannot aquire additional lock")
while L >= i:
ReleaseSys(cast[ptr TSysLock](g.locks[L])[])
g.locks[L+1] = g.locks[L]
dec L
# aquire the current lock:
AquireSys(lock)
g.locks[i] = p
inc(g.locksLen)
# aquire old locks in proper order again:
L = g.locksLen-1
inc i
while i <= L:
AquireSys(cast[ptr TSysLock](g.locks[i])[])
inc(i)
# DANGER: We can only modify this global var if we gained every lock!
# NO! We need an atomic increment. Crap.
discard system.atomicInc(deadlocksPrevented, 1)
assert OrderedLocks(g)
return
# simply add to the end:
if g.locksLen >= len(g.locks):
raise newException(EResourceExhausted, "cannot aquire additional lock")
AquireSys(lock)
g.locks[g.locksLen] = p
inc(g.locksLen)
assert OrderedLocks(g)
else:
ReleaseSys(L)
AquireSys(lock)
proc Release*(lock: var TLock) =
## Releases the lock `lock`.
when nodeadlocks:
var g = GetGlobals()
var p = addr(lock)
var L = g.locksLen
for i in countdown(L-1, 0):
if g.locks[i] == p:
for j in i..L-2: g.locks[j] = g.locks[j+1]
dec g.locksLen
break
ReleaseSys(lock)
proc joinThread*[TParam](t: TThread[TParam]) {.inline.} =
## waits for the thread `t` until it has terminated.
@@ -257,7 +275,7 @@ proc createThread*[TParam](t: var TThread[TParam],
var dummyThreadId: int32
t.sys = CreateThread(nil, 0'i32, threadProcWrapper[TParam],
addr(t.c), 0'i32, dummyThreadId)
else:
else:
if pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c)) != 0:
raise newException(EIO, "cannot create thread")
@@ -265,12 +283,12 @@ when isMainModule:
import os
var
thr: array [0..1, TThread[tuple[a,b: int]]]
thr: array [0..5, TThread[tuple[a, b: int]]]
L, M, N: TLock
proc doNothing() = nil
proc threadFunc(interval: tuple[a,b: int]) {.procvar.} =
proc threadFunc(interval: tuple[a, b: int]) {.procvar.} =
doNothing()
for i in interval.a..interval.b:
when nodeadlocks:
@@ -302,16 +320,15 @@ when isMainModule:
else: assert false
else:
Aquire(L) # lock stdout
Aquire(M)
Aquire(N)
echo i
os.sleep(10)
when nodeadlocks:
echo "deadlocks prevented: ", deadlocksPrevented
when nodeadlocks:
Release(N)
Release(M)
Release(L)
Release(M)
Release(N)
InitLock(L)
InitLock(M)
@@ -323,5 +340,7 @@ when isMainModule:
for i in 0..high(thr):
joinThread(thr[i])
GC_disable()
main()
GC_enable()

View File

@@ -1461,7 +1461,8 @@ when not defined(EcmaScript) and not defined(NimrodVM):
# Linux 64bit system. Very strange, but we are at the will of GCC's
# optimizer...
var locals {.volatile.}: pointer
setStackBottom(addr(locals))
locals = addr(locals)
setStackBottom(locals)
var
strDesc: TNimType

View File

@@ -525,21 +525,38 @@ proc isAllocatedPtr(a: TAllocator, p: pointer): bool =
# ---------------------- interface to programs -------------------------------
when not defined(useNimRtl):
proc alloc(size: int): pointer =
var heapLock: TSysLock
InitSysLock(HeapLock)
proc unlockedAlloc(size: int): pointer {.inline.} =
result = rawAlloc(allocator, size+sizeof(TFreeCell))
cast[ptr TFreeCell](result).zeroField = 1 # mark it as used
assert(not isAllocatedPtr(allocator, result))
result = cast[pointer](cast[TAddress](result) +% sizeof(TFreeCell))
proc unlockedAlloc0(size: int): pointer {.inline.} =
result = unlockedAlloc(size)
zeroMem(result, size)
proc unlockedDealloc(p: pointer) {.inline.} =
var x = cast[pointer](cast[TAddress](p) -% sizeof(TFreeCell))
assert(cast[ptr TFreeCell](x).zeroField == 1)
rawDealloc(allocator, x)
assert(not isAllocatedPtr(allocator, x))
proc alloc(size: int): pointer =
when hasThreadSupport: AquireSys(HeapLock)
result = unlockedAlloc(size)
when hasThreadSupport: ReleaseSys(HeapLock)
proc alloc0(size: int): pointer =
result = alloc(size)
zeroMem(result, size)
proc dealloc(p: pointer) =
var x = cast[pointer](cast[TAddress](p) -% sizeof(TFreeCell))
assert(cast[ptr TFreeCell](x).zeroField == 1)
rawDealloc(allocator, x)
assert(not isAllocatedPtr(allocator, x))
when hasThreadSupport: AquireSys(HeapLock)
unlockedDealloc(p)
when hasThreadSupport: ReleaseSys(HeapLock)
proc ptrSize(p: pointer): int =
var x = cast[pointer](cast[TAddress](p) -% sizeof(TFreeCell))

View File

@@ -1,7 +1,7 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2009 Andreas Rumpf
# (c) Copyright 2011 Andreas Rumpf
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
@@ -47,9 +47,9 @@ proc contains(s: TCellSeq, c: PCell): bool {.inline.} =
proc add(s: var TCellSeq, c: PCell) {.inline.} =
if s.len >= s.cap:
s.cap = s.cap * 3 div 2
var d = cast[PCellArray](alloc(s.cap * sizeof(PCell)))
var d = cast[PCellArray](unlockedAlloc(s.cap * sizeof(PCell)))
copyMem(d, s.d, s.len * sizeof(PCell))
dealloc(s.d)
unlockedDealloc(s.d)
s.d = d
# XXX: realloc?
s.d[s.len] = c
@@ -58,10 +58,10 @@ proc add(s: var TCellSeq, c: PCell) {.inline.} =
proc init(s: var TCellSeq, cap: int = 1024) =
s.len = 0
s.cap = cap
s.d = cast[PCellArray](alloc0(cap * sizeof(PCell)))
s.d = cast[PCellArray](unlockedAlloc0(cap * sizeof(PCell)))
proc deinit(s: var TCellSeq) =
dealloc(s.d)
unlockedDealloc(s.d)
s.d = nil
s.len = 0
s.cap = 0
@@ -70,7 +70,7 @@ const
InitCellSetSize = 1024 # must be a power of two!
proc Init(s: var TCellSet) =
s.data = cast[PPageDescArray](alloc0(InitCellSetSize * sizeof(PPageDesc)))
s.data = cast[PPageDescArray](unlockedAlloc0(InitCellSetSize * sizeof(PPageDesc)))
s.max = InitCellSetSize-1
s.counter = 0
s.head = nil
@@ -79,10 +79,10 @@ proc Deinit(s: var TCellSet) =
var it = s.head
while it != nil:
var n = it.next
dealloc(it)
unlockedDealloc(it)
it = n
s.head = nil # play it safe here
dealloc(s.data)
unlockedDealloc(s.data)
s.data = nil
s.counter = 0
@@ -110,11 +110,11 @@ proc CellSetRawInsert(t: TCellSet, data: PPageDescArray, desc: PPageDesc) =
proc CellSetEnlarge(t: var TCellSet) =
var oldMax = t.max
t.max = ((t.max+1)*2)-1
var n = cast[PPageDescArray](alloc0((t.max + 1) * sizeof(PPageDesc)))
var n = cast[PPageDescArray](unlockedAlloc0((t.max + 1) * sizeof(PPageDesc)))
for i in 0 .. oldmax:
if t.data[i] != nil:
CellSetRawInsert(t, n, t.data[i])
dealloc(t.data)
unlockedDealloc(t.data)
t.data = n
proc CellSetPut(t: var TCellSet, key: TAddress): PPageDesc =
@@ -132,7 +132,7 @@ proc CellSetPut(t: var TCellSet, key: TAddress): PPageDesc =
while t.data[h] != nil: h = nextTry(h, t.max)
assert(t.data[h] == nil)
# the new page descriptor goes into result
result = cast[PPageDesc](alloc0(sizeof(TPageDesc)))
result = cast[PPageDesc](unlockedAlloc0(sizeof(TPageDesc)))
result.next = t.head
result.key = key
t.head = result

View File

@@ -10,6 +10,9 @@
# Exception handling code. This is difficult because it has
# to work if there is no more memory (but it doesn't yet!).
const
MaxLocksPerThread = 10
var
stackTraceNewLine* = "\n" ## undocumented feature; it is replaced by ``<br>``
## for CGI applications
@@ -81,11 +84,9 @@ when hasThreadSupport:
proc pthread_setspecific(a1: Tpthread_key, a2: pointer): int32 {.
importc: "pthread_setspecific", header: "<pthread.h>".}
proc specificDestroy(mem: pointer) {.noconv.} =
#aquireSys(heapLock)
#dealloc(mem)
#releaseSys(heapLock)
#c_free(mem)
proc specificDestroy(mem: pointer) {.noconv.} =
# we really need a thread-safe 'dealloc' here:
dealloc(mem)
proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} =
discard pthread_key_create(addr(result), specificDestroy)
@@ -96,10 +97,12 @@ when hasThreadSupport:
result = pthread_getspecific(s)
type
TGlobals {.final, pure.} = object
TGlobals* {.final, pure.} = object
excHandler: PSafePoint
currException: ref E_Base
framePtr: PFrame
locksLen*: int
locks*: array [0..MaxLocksPerThread-1, pointer]
buf: string # cannot be allocated on the stack!
assertBuf: string # we need a different buffer for
# assert, as it raises an exception and
@@ -107,22 +110,16 @@ when hasThreadSupport:
gAssertionFailed: ref EAssertionFailed
tempFrames: array [0..127, PFrame] # cannot be allocated on the stack!
data: float # compiler should add thread local variables here!
PGlobals = ptr TGlobals
PGlobals* = ptr TGlobals
# it's more efficient to not use a global variable for the thread storage
# slot, but to rely on the implementation to assign slot 0 for us... ;-)
var globalsSlot = ThreadVarAlloc()
#const globalsSlot = TThreadVarSlot(0)
#assert checkSlot.int == globalsSlot.int
proc AtomicAlloc0(size: int): pointer =
#AquireSys(heapLock)
result = c_malloc(size)
zeroMem(result, size)
#ReleaseSys(heapLock)
var checkSlot = ThreadVarAlloc()
const globalsSlot = TThreadVarSlot(0)
assert checkSlot.int == globalsSlot.int
proc NewGlobals(): PGlobals =
result = cast[PGlobals](AtomicAlloc0(sizeof(TGlobals)))
result = cast[PGlobals](alloc0(sizeof(TGlobals)))
new(result.gAssertionFailed)
result.buf = newStringOfCap(2000)
result.assertBuf = newStringOfCap(2000)
@@ -134,7 +131,7 @@ when hasThreadSupport:
proc SetThreadLocalStorage*(p: pointer) {.inl.} =
ThreadVarSetValue(globalsSlot, p)
proc GetGlobals(): PGlobals {.compilerRtl, inl.} =
proc GetGlobals*(): PGlobals {.compilerRtl, inl.} =
result = cast[PGlobals](ThreadVarGetValue(globalsSlot))
# create for the main thread:

View File

@@ -61,9 +61,6 @@ type
decStack: TCellSeq # cells in the stack that are to decref again
cycleRoots: TCellSet
tempStack: TCellSeq # temporary stack for recursion elimination
when hasThreadSupport:
cycleRootsLock: TSysLock
zctLock: TSysLock
stat: TGcStat
var
@@ -80,13 +77,11 @@ var
proc aquire(gch: var TGcHeap) {.inline.} =
when hasThreadSupport:
aquireSys(gch.zctLock)
aquireSys(gch.cycleRootsLock)
AquireSys(HeapLock)
proc release(gch: var TGcHeap) {.inline.} =
when hasThreadSupport:
releaseSys(gch.cycleRootsLock)
releaseSys(gch.zctLock)
releaseSys(HeapLock)
proc addZCT(s: var TCellSeq, c: PCell) {.noinline.} =
if (c.refcount and rcZct) == 0:
@@ -205,18 +200,18 @@ proc prepareDealloc(cell: PCell) =
proc rtlAddCycleRoot(c: PCell) {.rtl, inl.} =
# we MUST access gch as a global here, because this crosses DLL boundaries!
when hasThreadSupport:
AquireSys(gch.cycleRootsLock)
AquireSys(HeapLock)
incl(gch.cycleRoots, c)
when hasThreadSupport:
ReleaseSys(gch.cycleRootsLock)
ReleaseSys(HeapLock)
proc rtlAddZCT(c: PCell) {.rtl, inl.} =
# we MUST access gch as a global here, because this crosses DLL boundaries!
when hasThreadSupport:
AquireSys(gch.zctLock)
AquireSys(HeapLock)
addZCT(gch.zct, c)
when hasThreadSupport:
ReleaseSys(gch.zctLock)
ReleaseSys(HeapLock)
proc decRef(c: PCell) {.inline.} =
when stressGC:
@@ -284,11 +279,7 @@ proc initGC() =
init(gch.tempStack)
Init(gch.cycleRoots)
Init(gch.decStack)
when hasThreadSupport:
InitSysLock(gch.cycleRootsLock)
InitSysLock(gch.zctLock)
new(gOutOfMem) # reserve space for the EOutOfMemory exception here!
proc forAllSlotsAux(dest: pointer, n: ptr TNimNode, op: TWalkOp) =
var d = cast[TAddress](dest)
@@ -690,10 +681,11 @@ proc unmarkStackAndRegisters(gch: var TGcHeap) =
var d = gch.decStack.d
for i in 0..gch.decStack.len-1:
assert isAllocatedPtr(allocator, d[i])
# decRef(d[i]) inlined: cannot create a cycle
# decRef(d[i]) inlined: cannot create a cycle and must not aquire lock
var c = d[i]
# XXX no need for an atomic dec here:
if atomicDec(c.refcount, rcIncrement) <% rcIncrement:
rtlAddZCT(c)
addZCT(gch.zct, c)
assert c.typ != nil
gch.decStack.len = 0

View File

@@ -97,6 +97,10 @@ when defined(boehmgc):
proc dealloc(p: Pointer) =
boehmDealloc(p)
proc unlockedAlloc(size: int): pointer {.inline.} = result = alloc(size)
proc unlockedAlloc0(size: int): pointer {.inline.} = result = alloc0(size)
proc unlockedDealloc(p: pointer) {.inline.} = dealloc(p)
proc initGC() =
when defined(macosx): boehmGCinit()
@@ -148,21 +152,6 @@ elif defined(nogc):
include "system/alloc"
when false:
proc alloc(size: int): pointer =
result = c_malloc(size)
if result == nil: raiseOutOfMem()
proc alloc0(size: int): pointer =
result = alloc(size)
zeroMem(result, size)
proc realloc(p: Pointer, newsize: int): pointer =
result = c_realloc(p, newsize)
if result == nil: raiseOutOfMem()
proc dealloc(p: Pointer) = c_free(p)
proc getOccupiedMem(): int = return -1
proc getFreeMem(): int = return -1
proc getTotalMem(): int = return -1
proc initGC() = nil
proc GC_disable() = nil
proc GC_enable() = nil

View File

@@ -116,12 +116,16 @@ type
when not defined(useNimRtl):
proc initReprClosure(cl: var TReprClosure) =
# Important: cellsets does not lock the heap when doing allocations! We
# have to do it here ...
when hasThreadSupport and defined(heapLock): AquireSys(HeapLock)
Init(cl.marked)
cl.recdepth = -1 # default is to display everything!
cl.indent = 0
proc deinitReprClosure(cl: var TReprClosure) =
Deinit(cl.marked)
when hasThreadSupport and defined(heapLock): ReleaseSys(HeapLock)
proc reprBreak(result: var string, cl: TReprClosure) =
add result, "\n"

View File

@@ -15,6 +15,8 @@ when not SystemInclude:
# ugly hack: this file is then included from core/threads, so we have
# thread support:
const hasThreadSupport = true
include "lib/system/ansi_c"
when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
proc sync_add_and_fetch(p: var int, val: int): int {.
@@ -94,7 +96,3 @@ else:
proc ReleaseSys(L: var TSysLock) {.
importc: "pthread_mutex_unlock", header: "<pthread.h>".}
when SystemInclude:
var heapLock: TSysLock
InitSysLock(HeapLock)

View File

@@ -1,5 +1,14 @@
* add --deadlock_prevention:on|off switch; timeout for locks
* implicit ref/ptr->var conversion
* add --deadlock_prevention:on|off switch? timeout for locks?
* make GC fully thread-safe; needs:
- global list of threads
- thread must store its stack boundaries
- GC must traverse these stacks
- isOnStack() needs to take them into account (SLOW?)
- GC must stop the world
* implicit ref/ptr->var conversion; the compiler may store an object
implicitly on the heap for write barrier efficiency! (Especially
important for multi-threading!)
High priority (version 0.9.0)