fixed the deadlock that happens when stress testing ToFreeQueue

This commit is contained in:
Araq
2014-11-08 11:18:25 +01:00
parent 06e9932e8a
commit 943d4ee714
2 changed files with 31 additions and 26 deletions

View File

@@ -112,8 +112,8 @@ type
ToFreeQueue = object
len: int
lock: TLock
empty: TCond
data: array[2, pointer]
empty: CondVar
data: array[128, pointer]
WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
Worker = object
@@ -143,13 +143,27 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
await(w.taskStarted)
result = true
proc cleanFlowVars(w: ptr Worker) =
let q = addr(w.q)
acquire(q.lock)
for i in 0 .. <q.len:
GC_unref(cast[RootRef](q.data[i]))
#echo "GC_unref"
q.len = 0
release(q.lock)
proc wakeupWorkerToProcessQueue(w: ptr Worker) =
# Note that if this fails somebody else already woke up the thread so it's
# perfectly fine to do nothing:
if cas(addr w.ready, true, false):
w.data = nil
w.f = proc (t, a: pointer) {.nimcall.} = discard
signal(w.taskArrived)
# we have to ensure it's us who wakes up the owning thread.
# This is quite horrible code, but it runs so rarely that it doesn't matter:
while not cas(addr w.ready, true, false):
cpuRelax()
discard
w.data = nil
w.f = proc (w, a: pointer) {.nimcall.} =
let w = cast[ptr Worker](w)
cleanFlowVars(w)
signal(w.q.empty)
signal(w.taskArrived)
proc finished(fv: FlowVarBase) =
doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
@@ -160,29 +174,17 @@ proc finished(fv: FlowVarBase) =
if fv.data.isNil: return
let owner = cast[ptr Worker](fv.owner)
let q = addr(owner.q)
var waited = false
acquire(q.lock)
while not (q.len < q.data.len):
#echo "EXHAUSTED!"
release(q.lock)
wakeupWorkerToProcessQueue(owner)
wait(q.empty, q.lock)
waited = true
await(q.empty)
acquire(q.lock)
q.data[q.len] = cast[pointer](fv.data)
inc q.len
release(q.lock)
fv.data = nil
# wakeup other potentially waiting threads:
if waited: signal(q.empty)
proc cleanFlowVars(w: ptr Worker) =
let q = addr(w.q)
acquire(q.lock)
for i in 0 .. <q.len:
GC_unref(cast[RootRef](q.data[i]))
#echo "GC_unref"
q.len = 0
release(q.lock)
signal(q.empty)
proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
@@ -273,7 +275,10 @@ var
proc slave(w: ptr Worker) {.thread.} =
while true:
w.ready = true
when declared(atomicStoreN):
atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
else:
w.ready = true
readyWorker = w
signal(gSomeReady)
await(w.taskArrived)
@@ -305,7 +310,7 @@ proc activateThread(i: int) {.noinline.} =
workersData[i].taskArrived = createCondVar()
workersData[i].taskStarted = createCondVar()
workersData[i].initialized = true
initCond(workersData[i].q.empty)
workersData[i].q.empty = createCondVar()
initLock(workersData[i].q.lock)
createThread(workers[i], slave, addr(workersData[i]))

View File

@@ -1,7 +1,7 @@
discard """
output: '''called deepCopy for int
called deepCopy for int
done999 999
done999 999'''
"""
import threadpool