split FutureStream from asyncfutures

This commit is contained in:
Michał Zieliński
2017-06-05 14:44:12 +02:00
parent e86863e8f5
commit 93827e6ab8
4 changed files with 117 additions and 109 deletions

View File

@@ -9,11 +9,12 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, options, asyncfutures
import os, tables, strutils, times, heapqueue, options, asyncstreams
import asyncfutures except callSoon
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures
export asyncfutures, asyncstreams
#{.injectStmt: newGcInvariant().}
@@ -188,7 +189,7 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc callSoon*(cbproc: proc ()) {.gcsafe.}
proc callSoon(cbproc: proc ()) {.gcsafe.}
proc initGlobalDispatcher =
if asyncfutures.callSoonProc == nil:
@@ -1336,7 +1337,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
return
add(result, c)
proc callSoon*(cbproc: proc ()) =
proc callSoon(cbproc: proc ()) =
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
getGlobalDispatcher().callbacks.addLast(cbproc)

View File

@@ -19,12 +19,6 @@ type
FutureVar*[T] = distinct Future[T]
FutureStream*[T] = ref object of FutureBase ## Special future that acts as
## a queue. Its API is still
## experimental and so is
## subject to change.
queue: Deque[T]
FutureError* = object of Exception
cause*: FutureBase
@@ -35,7 +29,7 @@ when not defined(release):
var callSoonProc* {.threadvar.}: (proc(cbproc: proc ()) {.gcsafe.})
proc callSoon(cbproc: proc ()) =
proc callSoon*(cbproc: proc ()) =
if callSoonProc == nil:
# Loop not initialized yet. Call the function directly to allow setup code to use futures.
cbproc()
@@ -66,22 +60,6 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new ``FutureStream``. This future's callback is activated when
## two events occur:
##
## * New data is written into the future stream.
## * The future stream is completed (this means that no more data will be
## written).
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
##
## **Note:** The API of FutureStream is still new and so has a higher
## likelihood of changing in the future.
setupFutureBase(fromProc)
result.queue = initDeque[T]()
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false
@@ -148,12 +126,6 @@ proc complete*[T](future: FutureVar[T], val: T) =
if not fut.cb.isNil():
fut.cb()
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signalling the end of data.
future.finished = true
if not future.cb.isNil():
future.cb()
proc fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -190,20 +162,6 @@ proc `callback=`*[T](future: Future[T],
## If future has already completed then ``cb`` will be called immediately.
future.callback = proc () = cb(future)
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
## future stream.
##
## The callback is also called when the future is completed. So you should
## use ``finished`` to check whether data is available.
##
## If the future stream already has data or is finished then ``cb`` will be
## called immediately.
future.cb = proc () = cb(future)
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc injectStacktrace[T](future: Future[T]) =
# TODO: Come up with something better.
when not defined(release):
@@ -250,18 +208,12 @@ proc mget*[T](future: FutureVar[T]): var T =
## Future has not been finished.
result = Future[T](future).value
proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
proc finished*[T](future: Future[T] | FutureVar[T]): bool =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
##
## For a ``FutureStream`` a ``true`` value means that no more data will be
## placed inside the stream _and_ that there is no data waiting to be
## retrieved.
when future is FutureVar[T]:
result = (Future[T](future)).finished
elif future is FutureStream[T]:
result = future.finished and future.queue.len == 0
else:
result = future.finished
@@ -269,57 +221,6 @@ proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise ``ValueError`` if ``future`` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Implement limiting of the streams storage to prevent it growing
# infinitely when no reads are occuring.
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, ``false`` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# We don't want this callback called again.
future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
if not resFut.finished:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len
proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.

105
lib/pure/asyncstreams.nim Normal file
View File

@@ -0,0 +1,105 @@
import asyncfutures
import deques
type
FutureStream*[T] = ref object ## Special future that acts as
## a queue. Its API is still
## experimental and so is
## subject to change.
queue: Deque[T]
finished: bool
cb: proc () {.closure, gcsafe.}
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new ``FutureStream``. This future's callback is activated when
## two events occur:
##
## * New data is written into the future stream.
## * The future stream is completed (this means that no more data will be
## written).
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
##
## **Note:** The API of FutureStream is still new and so has a higher
## likelihood of changing in the future.
result = FutureStream[T](finished: false, cb: nil)
result.queue = initDeque[T]()
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signalling the end of data.
future.finished = true
if not future.cb.isNil:
future.cb()
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
## future stream.
##
## The callback is also called when the future is completed. So you should
## use ``finished`` to check whether data is available.
##
## If the future stream already has data or is finished then ``cb`` will be
## called immediately.
future.cb = proc () = cb(future)
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc finished*[T](future: FutureStream[T]): bool =
## Check if a ``FutureStream`` is finished. ``true`` value means that
## no more data will be placed inside the stream _and_ that there is
## no data waiting to be retrieved.
result = future.finished and future.queue.len == 0
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise ``ValueError`` if ``future`` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Implement limiting of the streams storage to prevent it growing
# infinitely when no reads are occuring.
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, ``false`` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# We don't want this callback called again.
future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
if not resFut.finished:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len

View File

@@ -9,12 +9,13 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, lists, options, asyncfutures
import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
import asyncfutures except callSoon
import nativesockets, net, deques
export Port, SocketFlag
export asyncfutures
export asyncfutures, asyncstreams
#{.injectStmt: newGcInvariant().}
@@ -160,7 +161,7 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
result = int((timerTimeout - curTime) * 1000)
if result < 0: result = 0
proc callSoon*(cbproc: proc ()) {.gcsafe.}
proc callSoon(cbproc: proc ()) {.gcsafe.}
proc initGlobalDispatcher =
if asyncfutures.callSoonProc == nil:
@@ -1611,7 +1612,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
return
add(result, c)
proc callSoon*(cbproc: proc ()) =
proc callSoon(cbproc: proc ()) =
## Schedule `cbproc` to be called as soon as possible.
## The callback is called when control returns to the event loop.
getGlobalDispatcher().callbacks.addLast(cbproc)