asyncdispatch: split asyncfutures into its own module

This slightly changes behaviour of callSoon - before loop is initialized, callSoon will call the function immediately.
This commit is contained in:
Michał Zieliński
2017-06-05 00:12:44 +02:00
parent 9e12db4459
commit e86863e8f5
4 changed files with 52 additions and 22 deletions

View File

@@ -9,11 +9,11 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, options
import os, tables, strutils, times, heapqueue, options, asyncfutures
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures
#{.injectStmt: newGcInvariant().}
@@ -159,8 +159,6 @@ export Port, SocketFlag
# TODO: Check if yielded future is nil and throw a more meaningful exception
include includes/asyncfutures
type
PDispatcherBase = ref object of RootRef
timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
@@ -190,6 +188,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc callSoon*(cbproc: proc ()) {.gcsafe.}
proc initGlobalDispatcher =
if asyncfutures.callSoonProc == nil:
asyncfutures.callSoonProc = callSoon
when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
@@ -237,15 +241,17 @@ when defined(windows) or defined(nimdoc):
result.callbacks = initDeque[proc ()](64)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
## Retrieves the global thread-local dispatcher.
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initGlobalDispatcher()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
@@ -932,14 +938,17 @@ else:
result.callbacks = initDeque[proc ()](64)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initGlobalDispatcher()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc update(fd: AsyncFD, events: set[Event]) =
let p = getGlobalDispatcher()

View File

@@ -1,3 +1,6 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, options, deques
# TODO: This shouldn't need to be included, but should ideally be exported.
type
@@ -30,7 +33,14 @@ type
when not defined(release):
var currentID = 0
proc callSoon*(cbproc: proc ()) {.gcsafe.}
var callSoonProc* {.threadvar.}: (proc(cbproc: proc ()) {.gcsafe.})
proc callSoon(cbproc: proc ()) =
if callSoonProc == nil:
# Loop not initialized yet. Call the function directly to allow setup code to use futures.
cbproc()
else:
callSoonProc(cbproc)
template setupFutureBase(fromProc: string) =
new(result)

View File

@@ -9,11 +9,12 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, lists, options
import os, tables, strutils, times, heapqueue, lists, options, asyncfutures
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures
#{.injectStmt: newGcInvariant().}
@@ -130,8 +131,6 @@ export Port, SocketFlag
# TODO: Check if yielded future is nil and throw a more meaningful exception
include "../includes/asyncfutures"
type
PDispatcherBase = ref object of RootRef
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
@@ -161,6 +160,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc callSoon*(cbproc: proc ()) {.gcsafe.}
proc initGlobalDispatcher =
if asyncfutures.callSoonProc == nil:
asyncfutures.callSoonProc = callSoon
when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
@@ -214,15 +219,17 @@ when defined(windows) or defined(nimdoc):
result.callbacks = initDeque[proc ()](64)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
## Retrieves the global thread-local dispatcher.
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initGlobalDispatcher()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
@@ -1081,14 +1088,17 @@ else:
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initGlobalDispatcher()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()

View File

@@ -17,5 +17,6 @@ proc asyncRecursionTest*(): Future[int] {.async.} =
inc(i)
when isMainModule:
setGlobalDispatcher(newDispatcher())
var i = waitFor asyncRecursionTest()
echo i