mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-21 06:45:27 +00:00
WIP implementation of FutureStream.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import queues
|
||||
|
||||
# TODO: This shouldn't need to be included, but should ideally be exported.
|
||||
type
|
||||
@@ -16,6 +17,10 @@ type
|
||||
|
||||
FutureVar*[T] = distinct Future[T]
|
||||
|
||||
FutureStream*[T] = ref object of FutureBase ## Special future that acts as
|
||||
## a queue.
|
||||
queue: Queue[T]
|
||||
|
||||
FutureError* = object of Exception
|
||||
cause*: FutureBase
|
||||
|
||||
@@ -26,11 +31,7 @@ when not defined(release):
|
||||
|
||||
proc callSoon*(cbproc: proc ()) {.gcsafe.}
|
||||
|
||||
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
|
||||
## Creates a new future.
|
||||
##
|
||||
## Specifying ``fromProc``, which is a string specifying the name of the proc
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
template setupFutureBase(fromProc: string): stmt =
|
||||
new(result)
|
||||
result.finished = false
|
||||
when not defined(release):
|
||||
@@ -39,6 +40,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
|
||||
result.fromProc = fromProc
|
||||
currentID.inc()
|
||||
|
||||
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
|
||||
## Creates a new future.
|
||||
##
|
||||
## Specifying ``fromProc``, which is a string specifying the name of the proc
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
setupFutureBase(fromProc)
|
||||
|
||||
proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
|
||||
## Create a new ``FutureVar``. This Future type is ideally suited for
|
||||
## situations where you want to avoid unnecessary allocations of Futures.
|
||||
@@ -47,6 +55,15 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
result = FutureVar[T](newFuture[T](fromProc))
|
||||
|
||||
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
|
||||
## Create a new ``FutureStream``. This Future type's callback can be activated
|
||||
## multiple times when new data is written to it.
|
||||
##
|
||||
## Specifying ``fromProc``, which is a string specifying the name of the proc
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
setupFutureBase(fromProc)
|
||||
result.queue = initQueue[T]()
|
||||
|
||||
proc clean*[T](future: FutureVar[T]) =
|
||||
## Resets the ``finished`` status of ``future``.
|
||||
Future[T](future).finished = false
|
||||
@@ -107,12 +124,18 @@ proc complete*[T](future: FutureVar[T], val: T) =
|
||||
## Any previously stored value will be overwritten.
|
||||
template fut: untyped = Future[T](future)
|
||||
checkFinished(fut)
|
||||
assert(fut.error == nil)
|
||||
assert(fut.error.isNil())
|
||||
fut.finished = true
|
||||
fut.value = val
|
||||
if fut.cb != nil:
|
||||
if not fut.cb.isNil():
|
||||
fut.cb()
|
||||
|
||||
proc complete*[T](future: FutureStream[T]) =
|
||||
## Completes a ``FutureStream`` to signify the end of data.
|
||||
future.finished = true
|
||||
if not future.cb.isNil():
|
||||
future.cb()
|
||||
|
||||
proc fail*[T](future: Future[T], error: ref Exception) =
|
||||
## Completes ``future`` with ``error``.
|
||||
#assert(not future.finished, "Future already finished, cannot finish twice.")
|
||||
@@ -149,6 +172,20 @@ proc `callback=`*[T](future: Future[T],
|
||||
## If future has already completed then ``cb`` will be called immediately.
|
||||
future.callback = proc () = cb(future)
|
||||
|
||||
proc `callback=`*[T](future: FutureStream[T],
|
||||
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
|
||||
## Sets the callback proc to be called when data was placed inside the
|
||||
## future stream.
|
||||
##
|
||||
## The callback is also called when the future is completed. So you should
|
||||
## use ``finished`` to check whether data is available.
|
||||
##
|
||||
## If the future stream already has data then ``cb`` will be called
|
||||
## immediately.
|
||||
future.cb = proc () = cb(future)
|
||||
if future.queue.len > 0:
|
||||
callSoon(future.cb)
|
||||
|
||||
proc injectStacktrace[T](future: Future[T]) =
|
||||
# TODO: Come up with something better.
|
||||
when not defined(release):
|
||||
@@ -179,6 +216,42 @@ proc read*[T](future: Future[T] | FutureVar[T]): T =
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} =
|
||||
## Retrieves the oldest value stored inside the stream. If the stream
|
||||
## contains no data then this function will fail with a ``ValueError``
|
||||
## exception.
|
||||
##
|
||||
## This function will remove the data that was returned from the underlying
|
||||
## ``FutureStream``.
|
||||
return future.queue.dequeue()
|
||||
|
||||
proc put*[T](future: FutureStream[T], value: T): T =
|
||||
## Writes the specified value inside the specified future stream.
|
||||
##
|
||||
## This will raise ``ValueError`` if ``future`` is finished.
|
||||
if future.finished:
|
||||
let msg = "FutureStream is finished and so no longer accepts new data."
|
||||
raise newException(ValueError, msg)
|
||||
future.queue.enqueue(value)
|
||||
|
||||
proc peek*[T](future: FutureStream[T]): T =
|
||||
## Returns the oldest value stored inside the specified future stream.
|
||||
return future.queue.front()
|
||||
|
||||
proc takeAsync*[T](future: FutureStream[T]): Future[T] =
|
||||
## Returns a future that will complete when the ``FutureStream`` has data
|
||||
## placed into it. The future will be completed with the oldest value stored
|
||||
## inside the stream.
|
||||
##
|
||||
## This function will remove the data that was returned from the underlying
|
||||
## ``FutureStream``.
|
||||
var resFut = newFuture[T]("FutureStream.wait")
|
||||
let cb = future.cb
|
||||
future.callback =
|
||||
proc (fs: FutureStream[T]) =
|
||||
resFut.complete(fs.take())
|
||||
if not cb.isNil: cb()
|
||||
|
||||
proc readError*[T](future: Future[T]): ref Exception =
|
||||
## Retrieves the exception stored in ``future``.
|
||||
##
|
||||
@@ -195,10 +268,13 @@ proc mget*[T](future: FutureVar[T]): var T =
|
||||
## Future has not been finished.
|
||||
result = Future[T](future).value
|
||||
|
||||
proc finished*[T](future: Future[T] | FutureVar[T]): bool =
|
||||
proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
|
||||
## Determines whether ``future`` has completed.
|
||||
##
|
||||
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
|
||||
##
|
||||
## For a ``FutureStream`` this signifies that no more data will be placed
|
||||
## inside it.
|
||||
when future is FutureVar[T]:
|
||||
result = (Future[T](future)).finished
|
||||
else:
|
||||
|
||||
19
tests/async/tfuturestream.nim
Normal file
19
tests/async/tfuturestream.nim
Normal file
@@ -0,0 +1,19 @@
|
||||
import asyncdispatch
|
||||
|
||||
var fs = newFutureStream[string]()
|
||||
|
||||
proc alpha() {.async.} =
|
||||
for i in 0 .. 5:
|
||||
fs.put($i)
|
||||
await sleepAsync(1000)
|
||||
|
||||
fs.complete()
|
||||
|
||||
proc beta() {.async.} =
|
||||
while not fs.finished():
|
||||
echo(await fs.takeAsync())
|
||||
|
||||
echo("Finished")
|
||||
|
||||
asyncCheck alpha()
|
||||
asyncCheck beta()
|
||||
Reference in New Issue
Block a user