mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-19 05:50:30 +00:00
Fix waiting on coroutines (#5463)
Public coroutine API returns a safe reference to specific running coroutine. Fixes bug where multiple coroutines executing same procedure would identify as same coroutine. Greatly optimizes `alive()` (and as a result of that `wait()`) calls. Coroutine struct is allocated together with stack as memory unmanaged by GC.
This commit is contained in:
committed by
Andreas Rumpf
parent
34a3d40d18
commit
cd2721242a
@@ -23,7 +23,6 @@ when not nimCoroutines and not defined(nimdoc):
|
||||
{.error: "Coroutines require -d:nimCoroutines".}
|
||||
|
||||
import os
|
||||
import macros
|
||||
import lists
|
||||
include system/timers
|
||||
|
||||
@@ -120,27 +119,38 @@ const
|
||||
CORO_FINISHED = 2
|
||||
|
||||
type
|
||||
Stack = object
|
||||
Stack {.pure.} = object
|
||||
top: pointer # Top of the stack. Pointer used for deallocating stack if we own it.
|
||||
bottom: pointer # Very bottom of the stack, acts as unique stack identifier.
|
||||
size: int
|
||||
|
||||
Coroutine = ref object
|
||||
Coroutine {.pure.} = object
|
||||
execContext: Context
|
||||
fn: proc()
|
||||
state: int
|
||||
lastRun: Ticks
|
||||
sleepTime: float
|
||||
stack: Stack
|
||||
reference: CoroutineRef
|
||||
|
||||
CoroutinePtr = ptr Coroutine
|
||||
|
||||
CoroutineRef* = ref object
|
||||
## CoroutineRef holds a pointer to actual coroutine object. Public API always returns
|
||||
## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine
|
||||
## object while it can be safely deallocated by coroutine scheduler loop. In this case
|
||||
## Coroutine.reference.coro is set to nil. Public API checks for for it being nil and
|
||||
## gracefully fails if it is nil.
|
||||
coro: CoroutinePtr
|
||||
|
||||
CoroutineLoopContext = ref object
|
||||
coroutines: DoublyLinkedList[Coroutine]
|
||||
current: DoublyLinkedNode[Coroutine]
|
||||
coroutines: DoublyLinkedList[CoroutinePtr]
|
||||
current: DoublyLinkedNode[CoroutinePtr]
|
||||
loop: Coroutine
|
||||
|
||||
var ctx {.threadvar.}: CoroutineLoopContext
|
||||
|
||||
proc getCurrent(): Coroutine =
|
||||
proc getCurrent(): CoroutinePtr =
|
||||
## Returns current executing coroutine object.
|
||||
var node = ctx.current
|
||||
if node != nil:
|
||||
@@ -151,7 +161,7 @@ proc initialize() =
|
||||
## Initializes coroutine state of current thread.
|
||||
if ctx == nil:
|
||||
ctx = CoroutineLoopContext()
|
||||
ctx.coroutines = initDoublyLinkedList[Coroutine]()
|
||||
ctx.coroutines = initDoublyLinkedList[CoroutinePtr]()
|
||||
ctx.loop = Coroutine()
|
||||
ctx.loop.state = CORO_EXECUTING
|
||||
when coroBackend == CORO_BACKEND_FIBERS:
|
||||
@@ -159,7 +169,7 @@ proc initialize() =
|
||||
|
||||
proc runCurrentTask()
|
||||
|
||||
proc switchTo(current, to: Coroutine) =
|
||||
proc switchTo(current, to: CoroutinePtr) =
|
||||
## Switches execution from `current` into `to` context.
|
||||
to.lastRun = getTicks()
|
||||
# Update position of current stack so gc invoked from another stack knows how much to scan.
|
||||
@@ -192,7 +202,7 @@ proc suspend*(sleepTime: float=0) =
|
||||
## Until then other coroutines are executed.
|
||||
var current = getCurrent()
|
||||
current.sleepTime = sleepTime
|
||||
switchTo(current, ctx.loop)
|
||||
switchTo(current, addr(ctx.loop))
|
||||
|
||||
proc runCurrentTask() =
|
||||
## Starts execution of current coroutine and updates it's state through coroutine's life.
|
||||
@@ -218,31 +228,33 @@ proc runCurrentTask() =
|
||||
suspend(0) # Exit coroutine without returning from coroExecWithStack()
|
||||
doAssert false
|
||||
|
||||
proc start*(c: proc(), stacksize: int=defaultStackSize) =
|
||||
proc start*(c: proc(), stacksize: int=defaultStackSize): CoroutineRef {.discardable.} =
|
||||
## Schedule coroutine for execution. It does not run immediately.
|
||||
if ctx == nil:
|
||||
initialize()
|
||||
|
||||
var coro = Coroutine()
|
||||
coro.fn = c
|
||||
var coro: CoroutinePtr
|
||||
when coroBackend == CORO_BACKEND_FIBERS:
|
||||
coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine)))
|
||||
coro.execContext = CreateFiberEx(stacksize, stacksize,
|
||||
FIBER_FLAG_FLOAT_SWITCH, (proc(p: pointer): void {.stdcall.} = runCurrentTask()), nil)
|
||||
coro.stack.size = stacksize
|
||||
else:
|
||||
var stack: pointer
|
||||
while stack == nil:
|
||||
stack = alloc0(stacksize)
|
||||
coro.stack.top = stack
|
||||
coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize))
|
||||
coro.stack.top = cast[pointer](cast[ByteAddress](coro) + sizeof(Coroutine))
|
||||
coro.stack.bottom = cast[pointer](cast[ByteAddress](coro.stack.top) + stacksize)
|
||||
when coroBackend == CORO_BACKEND_UCONTEXT:
|
||||
discard getcontext(coro.execContext)
|
||||
coro.execContext.uc_stack.ss_sp = cast[pointer](cast[ByteAddress](stack) + stacksize)
|
||||
coro.execContext.uc_stack.ss_size = coro.stack.size
|
||||
coro.execContext.uc_link = addr ctx.loop.execContext
|
||||
coro.execContext.uc_stack.ss_sp = coro.stack.top
|
||||
coro.execContext.uc_stack.ss_size = stacksize
|
||||
coro.execContext.uc_link = addr(ctx.loop.execContext)
|
||||
makecontext(coro.execContext, runCurrentTask, 0)
|
||||
coro.fn = c
|
||||
coro.stack.size = stacksize
|
||||
coro.state = CORO_CREATED
|
||||
coro.reference = CoroutineRef(coro: coro)
|
||||
ctx.coroutines.append(coro)
|
||||
return coro.reference
|
||||
|
||||
proc run*() =
|
||||
initialize()
|
||||
@@ -256,7 +268,7 @@ proc run*() =
|
||||
var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000)
|
||||
if remaining <= 0:
|
||||
# Save main loop context. Suspending coroutine will resume after this statement with
|
||||
switchTo(ctx.loop, current)
|
||||
switchTo(addr(ctx.loop), current)
|
||||
else:
|
||||
if minDelay > 0 and remaining > 0:
|
||||
minDelay = min(remaining, minDelay)
|
||||
@@ -269,14 +281,14 @@ proc run*() =
|
||||
# If first coroutine ends then `prev` is nil even if more coroutines
|
||||
# are to be scheduled.
|
||||
next = ctx.current.next
|
||||
current.reference.coro = nil
|
||||
ctx.coroutines.remove(ctx.current)
|
||||
GC_removeStack(current.stack.bottom)
|
||||
when coroBackend == CORO_BACKEND_FIBERS:
|
||||
DeleteFiber(current.execContext)
|
||||
else:
|
||||
dealloc(current.stack.top)
|
||||
current.stack.top = nil
|
||||
current.stack.bottom = nil
|
||||
dealloc(current)
|
||||
ctx.current = next
|
||||
elif ctx.current == nil or ctx.current.next == nil:
|
||||
ctx.current = ctx.coroutines.head
|
||||
@@ -284,13 +296,10 @@ proc run*() =
|
||||
else:
|
||||
ctx.current = ctx.current.next
|
||||
|
||||
proc alive*(c: proc()): bool =
|
||||
proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED
|
||||
## Returns ``true`` if coroutine has not returned, ``false`` otherwise.
|
||||
for coro in items(ctx.coroutines):
|
||||
if coro.fn == c:
|
||||
return coro.state != CORO_FINISHED
|
||||
|
||||
proc wait*(c: proc(), interval=0.01) =
|
||||
proc wait*(c: CoroutineRef, interval=0.01) =
|
||||
## Returns only after coroutine ``c`` has returned. ``interval`` is time in seconds how often.
|
||||
while alive(c):
|
||||
suspend(interval)
|
||||
|
||||
19
tests/coroutines/twait.nim
Normal file
19
tests/coroutines/twait.nim
Normal file
@@ -0,0 +1,19 @@
|
||||
discard """
|
||||
output: "Exit 1\nExit 2"
|
||||
"""
|
||||
import coro
|
||||
|
||||
var coro1: CoroutineRef
|
||||
|
||||
proc testCoroutine1() =
|
||||
for i in 0..<10:
|
||||
suspend(0)
|
||||
echo "Exit 1"
|
||||
|
||||
proc testCoroutine2() =
|
||||
coro1.wait()
|
||||
echo "Exit 2"
|
||||
|
||||
coro1 = coro.start(testCoroutine1)
|
||||
coro.start(testCoroutine2)
|
||||
run()
|
||||
1
tests/coroutines/twait.nim.cfg
Normal file
1
tests/coroutines/twait.nim.cfg
Normal file
@@ -0,0 +1 @@
|
||||
-d:nimCoroutines
|
||||
Reference in New Issue
Block a user