mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-28 17:04:41 +00:00
Resolve bugs with deep recursion of asyncdispatch.
Introduce callSoon() implementation. Patch tests to use waitFor() instead of asyncCheck()
This commit is contained in:
@@ -11,7 +11,7 @@ include "system/inclrtl"
|
||||
|
||||
import os, oids, tables, strutils, macros, times, heapqueue
|
||||
|
||||
import nativesockets, net
|
||||
import nativesockets, net, queues
|
||||
|
||||
export Port, SocketFlag
|
||||
|
||||
@@ -155,6 +155,9 @@ type
|
||||
|
||||
when not defined(release):
|
||||
var currentID = 0
|
||||
|
||||
proc callSoon*(cbproc: proc ()) {.gcsafe.}
|
||||
|
||||
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
|
||||
## Creates a new future.
|
||||
##
|
||||
@@ -257,7 +260,7 @@ proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
|
||||
## passes ``future`` as a param to the callback.
|
||||
future.cb = cb
|
||||
if future.finished:
|
||||
future.cb()
|
||||
callSoon(future.cb)
|
||||
|
||||
proc `callback=`*[T](future: Future[T],
|
||||
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
|
||||
@@ -355,11 +358,17 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
type
|
||||
PDispatcherBase = ref object of RootRef
|
||||
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
|
||||
callbacks: Queue[proc ()]
|
||||
|
||||
proc processTimers(p: PDispatcherBase) {.inline.} =
|
||||
while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
|
||||
p.timers.pop().fut.complete()
|
||||
|
||||
proc processPendingCallbacks(p: PDispatcherBase) =
|
||||
while p.callbacks.len > 0:
|
||||
var cb = p.callbacks.dequeue()
|
||||
cb()
|
||||
|
||||
proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
|
||||
# If dispatcher has active timers this proc returns the timeout
|
||||
# of the nearest timer. Returns `timeout` otherwise.
|
||||
@@ -403,6 +412,7 @@ when defined(windows) or defined(nimdoc):
|
||||
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
|
||||
result.handles = initSet[AsyncFD]()
|
||||
result.timers.newHeapQueue()
|
||||
result.callbacks = initQueue[proc ()](64)
|
||||
|
||||
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
|
||||
proc getGlobalDispatcher*(): PDispatcher =
|
||||
@@ -429,7 +439,7 @@ when defined(windows) or defined(nimdoc):
|
||||
proc poll*(timeout = 500) =
|
||||
## Waits for completion events and processes them.
|
||||
let p = getGlobalDispatcher()
|
||||
if p.handles.len == 0 and p.timers.len == 0:
|
||||
if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
|
||||
raise newException(ValueError,
|
||||
"No handles or timers registered in dispatcher.")
|
||||
|
||||
@@ -469,6 +479,8 @@ when defined(windows) or defined(nimdoc):
|
||||
|
||||
# Timer processing.
|
||||
processTimers(p)
|
||||
# Callback queue processing
|
||||
processPendingCallbacks(p)
|
||||
|
||||
var connectExPtr: pointer = nil
|
||||
var acceptExPtr: pointer = nil
|
||||
@@ -930,6 +942,7 @@ else:
|
||||
new result
|
||||
result.selector = newSelector()
|
||||
result.timers.newHeapQueue()
|
||||
result.callbacks = initQueue[proc ()](64)
|
||||
|
||||
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
|
||||
proc getGlobalDispatcher*(): PDispatcher =
|
||||
@@ -1025,7 +1038,10 @@ else:
|
||||
# (e.g. socket disconnected).
|
||||
discard
|
||||
|
||||
# Timer processing.
|
||||
processTimers(p)
|
||||
# Callback queue processing
|
||||
processPendingCallbacks(p)
|
||||
|
||||
proc connect*(socket: AsyncFD, address: string, port: Port,
|
||||
domain = AF_INET): Future[void] =
|
||||
@@ -1604,6 +1620,11 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
|
||||
return
|
||||
add(result, c)
|
||||
|
||||
proc callSoon*(cbproc: proc ()) =
|
||||
## Schedule `cbproc` to be called as soon as possible.
|
||||
## The callback is called when control returns to the event loop.
|
||||
getGlobalDispatcher().callbacks.enqueue(cbproc)
|
||||
|
||||
proc runForever*() =
|
||||
## Begins a never ending global dispatcher poll loop.
|
||||
while true:
|
||||
|
||||
@@ -36,4 +36,4 @@ proc main {.async.} =
|
||||
discard await g()
|
||||
echo 6
|
||||
|
||||
asyncCheck main()
|
||||
waitFor(main())
|
||||
|
||||
@@ -48,7 +48,7 @@ proc catch() {.async.} =
|
||||
except OSError, EInvalidField:
|
||||
assert false
|
||||
|
||||
asyncCheck catch()
|
||||
waitFor catch()
|
||||
|
||||
proc test(): Future[bool] {.async.} =
|
||||
result = false
|
||||
@@ -92,13 +92,13 @@ proc test4(): Future[int] {.async.} =
|
||||
result = 2
|
||||
|
||||
var x = test()
|
||||
assert x.read
|
||||
assert x.waitFor()
|
||||
|
||||
x = test2()
|
||||
assert x.read
|
||||
assert x.waitFor()
|
||||
|
||||
var y = test3()
|
||||
assert y.read == 2
|
||||
assert y.waitFor() == 2
|
||||
|
||||
y = test4()
|
||||
assert y.read == 2
|
||||
assert y.waitFor() == 2
|
||||
|
||||
@@ -16,4 +16,4 @@ x.callback =
|
||||
proc () =
|
||||
finished = true
|
||||
|
||||
while not finished: discard
|
||||
while not finished: poll()
|
||||
|
||||
Reference in New Issue
Block a user