Merge pull request #5938 from zielmicha/futures

[WIP] Better Future
This commit is contained in:
Dominik Picheta
2017-07-14 20:18:06 +01:00
committed by GitHub
6 changed files with 259 additions and 145 deletions

View File

@@ -9,11 +9,12 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, options
import os, tables, strutils, times, heapqueue, options, asyncstreams
import asyncfutures except callSoon
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures, asyncstreams
#{.injectStmt: newGcInvariant().}
@@ -159,8 +160,6 @@ export Port, SocketFlag
# TODO: Check if yielded future is nil and throw a more meaningful exception
include includes/asyncfutures
type
PDispatcherBase = ref object of RootRef
timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
@@ -190,6 +189,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc callSoon(cbproc: proc ()) {.gcsafe.}
proc initCallSoonProc =
if asyncfutures.getCallSoonProc().isNil:
asyncfutures.setCallSoonProc(callSoon)
when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
@@ -237,15 +242,17 @@ when defined(windows) or defined(nimdoc):
result.callbacks = initDeque[proc ()](64)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
## Retrieves the global thread-local dispatcher.
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
@@ -932,14 +939,17 @@ else:
result.callbacks = initDeque[proc ()](64)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc update(fd: AsyncFD, events: set[Event]) =
let p = getGlobalDispatcher()
@@ -1327,7 +1337,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
return
add(result, c)
proc callSoon*(cbproc: proc ()) =
proc callSoon(cbproc: proc ()) =
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
getGlobalDispatcher().callbacks.addLast(cbproc)

View File

@@ -1,8 +1,16 @@
import os, tables, strutils, times, heapqueue, options, deques
# TODO: This shouldn't need to be included, but should ideally be exported.
type
CallbackFunc = proc () {.closure, gcsafe.}
CallbackList = object
function: CallbackFunc
next: ref CallbackList
FutureBase* = ref object of RootObj ## Untyped future.
cb: proc () {.closure,gcsafe.}
callbacks: CallbackList
finished: bool
error*: ref Exception ## Stored exception
errorStackTrace*: string
@@ -16,12 +24,6 @@ type
FutureVar*[T] = distinct Future[T]
FutureStream*[T] = ref object of FutureBase ## Special future that acts as
## a queue. Its API is still
## experimental and so is
## subject to change.
queue: Deque[T]
FutureError* = object of Exception
cause*: FutureBase
@@ -30,7 +32,27 @@ type
when not defined(release):
var currentID = 0
proc callSoon*(cbproc: proc ()) {.gcsafe.}
var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
## Get current implementation of ``callSoon``.
return callSoonProc
proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
callSoonProc = p
proc callSoon*(cbproc: proc ()) =
## Call ``cbproc`` "soon".
##
## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
##
## If async dispatcher is not running, ``cbproc`` will be executed immediately.
if callSoonProc.isNil:
# Loop not initialized yet. Call the function directly to allow setup code to use futures.
cbproc()
else:
callSoonProc(cbproc)
template setupFutureBase(fromProc: string) =
new(result)
@@ -56,22 +78,6 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new ``FutureStream``. This future's callback is activated when
## two events occur:
##
## * New data is written into the future stream.
## * The future stream is completed (this means that no more data will be
## written).
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
##
## **Note:** The API of FutureStream is still new and so has a higher
## likelihood of changing in the future.
setupFutureBase(fromProc)
result.queue = initDeque[T]()
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false
@@ -98,6 +104,33 @@ proc checkFinished[T](future: Future[T]) =
err.cause = future
raise err
proc call(callbacks: var CallbackList) =
var current = callbacks
while true:
if not current.function.isNil:
callSoon(current.function)
if current.next.isNil:
break
else:
current = current.next[]
# callback will be called only once, let GC collect them now
callbacks.next = nil
callbacks.function = nil
proc add(callbacks: var CallbackList, function: CallbackFunc) =
if callbacks.function.isNil:
callbacks.function = function
assert callbacks.next == nil
else:
let newNext = new(ref CallbackList)
newNext.function = callbacks.function
newNext.next = callbacks.next
callbacks.next = newNext
callbacks.function = function
proc complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -105,8 +138,7 @@ proc complete*[T](future: Future[T], val: T) =
assert(future.error == nil)
future.value = val
future.finished = true
if future.cb != nil:
future.cb()
future.callbacks.call()
proc complete*(future: Future[void]) =
## Completes a void ``future``.
@@ -114,8 +146,7 @@ proc complete*(future: Future[void]) =
checkFinished(future)
assert(future.error == nil)
future.finished = true
if future.cb != nil:
future.cb()
future.callbacks.call()
proc complete*[T](future: FutureVar[T]) =
## Completes a ``FutureVar``.
@@ -123,8 +154,7 @@ proc complete*[T](future: FutureVar[T]) =
checkFinished(fut)
assert(fut.error == nil)
fut.finished = true
if fut.cb != nil:
fut.cb()
fut.callbacks.call()
proc complete*[T](future: FutureVar[T], val: T) =
## Completes a ``FutureVar`` with value ``val``.
@@ -138,12 +168,6 @@ proc complete*[T](future: FutureVar[T], val: T) =
if not fut.cb.isNil():
fut.cb()
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signalling the end of data.
future.finished = true
if not future.cb.isNil():
future.cb()
proc fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -152,26 +176,40 @@ proc fail*[T](future: Future[T], error: ref Exception) =
future.error = error
future.errorStackTrace =
if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
if future.cb != nil:
future.cb()
future.callbacks.call()
proc clearCallbacks(future: FutureBase) =
future.callbacks.function = nil
future.callbacks.next = nil
proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
assert cb != nil
if future.finished:
callSoon(cb)
else:
# This is to prevent exceptions from being silently ignored when a future
# is discarded.
# TODO: This may turn out to be a bad idea.
# Turns out this is a bad idea.
#raise error
discard
future.callbacks.add cb
proc addCallback*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.addCallback(
proc() =
cb(future)
)
proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
## Sets the callback proc to be called when the future completes.
## Clears the list of callbacks and sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
##
## **Note**: You most likely want the other ``callback`` setter which
## passes ``future`` as a param to the callback.
future.cb = cb
if future.finished:
callSoon(future.cb)
## It's recommended to use ``addCallback`` or ``then`` instead.
future.clearCallbacks
future.addCallback cb
proc `callback=`*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
@@ -180,20 +218,6 @@ proc `callback=`*[T](future: Future[T],
## If future has already completed then ``cb`` will be called immediately.
future.callback = proc () = cb(future)
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
## future stream.
##
## The callback is also called when the future is completed. So you should
## use ``finished`` to check whether data is available.
##
## If the future stream already has data or is finished then ``cb`` will be
## called immediately.
future.cb = proc () = cb(future)
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc injectStacktrace[T](future: Future[T]) =
# TODO: Come up with something better.
when not defined(release):
@@ -240,18 +264,12 @@ proc mget*[T](future: FutureVar[T]): var T =
## Future has not been finished.
result = Future[T](future).value
proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
proc finished*[T](future: Future[T] | FutureVar[T]): bool =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
##
## For a ``FutureStream`` a ``true`` value means that no more data will be
## placed inside the stream _and_ that there is no data waiting to be
## retrieved.
when future is FutureVar[T]:
result = (Future[T](future)).finished
elif future is FutureStream[T]:
result = future.finished and future.queue.len == 0
else:
result = future.finished
@@ -259,57 +277,6 @@ proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise ``ValueError`` if ``future`` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Implement limiting of the streams storage to prevent it growing
# infinitely when no reads are occuring.
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, ``false`` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# We don't want this callback called again.
future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
if not resFut.finished:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len
proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.

105
lib/pure/asyncstreams.nim Normal file
View File

@@ -0,0 +1,105 @@
import asyncfutures
import deques
type
FutureStream*[T] = ref object ## Special future that acts as
## a queue. Its API is still
## experimental and so is
## subject to change.
queue: Deque[T]
finished: bool
cb: proc () {.closure, gcsafe.}
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new ``FutureStream``. This future's callback is activated when
## two events occur:
##
## * New data is written into the future stream.
## * The future stream is completed (this means that no more data will be
## written).
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
##
## **Note:** The API of FutureStream is still new and so has a higher
## likelihood of changing in the future.
result = FutureStream[T](finished: false, cb: nil)
result.queue = initDeque[T]()
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signalling the end of data.
future.finished = true
if not future.cb.isNil:
future.cb()
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
## future stream.
##
## The callback is also called when the future is completed. So you should
## use ``finished`` to check whether data is available.
##
## If the future stream already has data or is finished then ``cb`` will be
## called immediately.
future.cb = proc () = cb(future)
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc finished*[T](future: FutureStream[T]): bool =
## Check if a ``FutureStream`` is finished. ``true`` value means that
## no more data will be placed inside the stream _and_ that there is
## no data waiting to be retrieved.
result = future.finished and future.queue.len == 0
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise ``ValueError`` if ``future`` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Implement limiting of the streams storage to prevent it growing
# infinitely when no reads are occuring.
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, ``false`` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# We don't want this callback called again.
future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
if not resFut.finished:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len

View File

@@ -9,11 +9,13 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, lists, options
import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
import asyncfutures except callSoon
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures, asyncstreams
#{.injectStmt: newGcInvariant().}
@@ -130,8 +132,6 @@ export Port, SocketFlag
# TODO: Check if yielded future is nil and throw a more meaningful exception
include "../includes/asyncfutures"
type
PDispatcherBase = ref object of RootRef
timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
@@ -161,6 +161,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc callSoon(cbproc: proc ()) {.gcsafe.}
proc initCallSoonProc =
if asyncfutures.getCallSoonProc().isNil:
asyncfutures.setCallSoonProc(callSoon)
when defined(windows) or defined(nimdoc):
import winlean, sets, hashes
type
@@ -214,15 +220,17 @@ when defined(windows) or defined(nimdoc):
result.callbacks = initDeque[proc ()](64)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
## Retrieves the global thread-local dispatcher.
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
@@ -1081,14 +1089,17 @@ else:
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc setGlobalDispatcher*(disp: PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
gDisp = disp
initCallSoonProc()
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()
@@ -1601,7 +1612,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
return
add(result, c)
proc callSoon*(cbproc: proc ()) =
proc callSoon(cbproc: proc ()) =
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
getGlobalDispatcher().callbacks.addLast(cbproc)

View File

@@ -17,5 +17,6 @@ proc asyncRecursionTest*(): Future[int] {.async.} =
inc(i)
when isMainModule:
setGlobalDispatcher(newDispatcher())
var i = waitFor asyncRecursionTest()
echo i

View File

@@ -0,0 +1,20 @@
discard """
exitcode: 0
output: '''3
2
1
5
'''
"""
import asyncfutures
let f1: Future[int] = newFuture[int]()
f1.addCallback(proc() = echo 1)
f1.addCallback(proc() = echo 2)
f1.addCallback(proc() = echo 3)
f1.complete(10)
let f2: Future[int] = newFuture[int]()
f2.addCallback(proc() = echo 4)
f2.callback = proc() = echo 5
f2.complete(10)