mirror of
https://github.com/nim-lang/Nim.git
synced 2026-06-04 02:44:44 +00:00
Implemented a first working version of FutureStreams.
This commit is contained in:
@@ -9,7 +9,7 @@
|
||||
|
||||
include "system/inclrtl"
|
||||
|
||||
import os, oids, tables, strutils, times, heapqueue
|
||||
import os, oids, tables, strutils, times, heapqueue, queues
|
||||
|
||||
import nativesockets, net, deques
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import queues
|
||||
|
||||
# TODO: This shouldn't need to be included, but should ideally be exported.
|
||||
type
|
||||
@@ -130,9 +129,11 @@ proc complete*[T](future: FutureVar[T], val: T) =
|
||||
if not fut.cb.isNil():
|
||||
fut.cb()
|
||||
|
||||
proc complete*[T](future: FutureStream[T]) =
|
||||
## Completes a ``FutureStream`` to signify the end of data.
|
||||
proc complete*[T](future: FutureStream[T], value: T) =
|
||||
## Completes a ``FutureStream`` with the last value, signifying the end of
|
||||
## data.
|
||||
future.finished = true
|
||||
future.queue.enqueue(value)
|
||||
if not future.cb.isNil():
|
||||
future.cb()
|
||||
|
||||
@@ -216,42 +217,6 @@ proc read*[T](future: Future[T] | FutureVar[T]): T =
|
||||
# TODO: Make a custom exception type for this?
|
||||
raise newException(ValueError, "Future still in progress.")
|
||||
|
||||
proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} =
|
||||
## Retrieves the oldest value stored inside the stream. If the stream
|
||||
## contains no data then this function will fail with a ``ValueError``
|
||||
## exception.
|
||||
##
|
||||
## This function will remove the data that was returned from the underlying
|
||||
## ``FutureStream``.
|
||||
return future.queue.dequeue()
|
||||
|
||||
proc put*[T](future: FutureStream[T], value: T): T =
|
||||
## Writes the specified value inside the specified future stream.
|
||||
##
|
||||
## This will raise ``ValueError`` if ``future`` is finished.
|
||||
if future.finished:
|
||||
let msg = "FutureStream is finished and so no longer accepts new data."
|
||||
raise newException(ValueError, msg)
|
||||
future.queue.enqueue(value)
|
||||
|
||||
proc peek*[T](future: FutureStream[T]): T =
|
||||
## Returns the oldest value stored inside the specified future stream.
|
||||
return future.queue.front()
|
||||
|
||||
proc takeAsync*[T](future: FutureStream[T]): Future[T] =
|
||||
## Returns a future that will complete when the ``FutureStream`` has data
|
||||
## placed into it. The future will be completed with the oldest value stored
|
||||
## inside the stream.
|
||||
##
|
||||
## This function will remove the data that was returned from the underlying
|
||||
## ``FutureStream``.
|
||||
var resFut = newFuture[T]("FutureStream.wait")
|
||||
let cb = future.cb
|
||||
future.callback =
|
||||
proc (fs: FutureStream[T]) =
|
||||
resFut.complete(fs.take())
|
||||
if not cb.isNil: cb()
|
||||
|
||||
proc readError*[T](future: Future[T]): ref Exception =
|
||||
## Retrieves the exception stored in ``future``.
|
||||
##
|
||||
@@ -274,9 +239,11 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
|
||||
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
|
||||
##
|
||||
## For a ``FutureStream`` this signifies that no more data will be placed
|
||||
## inside it.
|
||||
## inside it and that there is no data waiting to be retrieved.
|
||||
when future is FutureVar[T]:
|
||||
result = (Future[T](future)).finished
|
||||
elif future is FutureStream[T]:
|
||||
result = future.finished and future.queue.len == 0
|
||||
else:
|
||||
result = future.finished
|
||||
|
||||
@@ -284,6 +251,47 @@ proc failed*(future: FutureBase): bool =
|
||||
## Determines whether ``future`` completed with an error.
|
||||
return future.error != nil
|
||||
|
||||
proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} =
|
||||
## Retrieves the oldest value stored inside the stream. If the stream
|
||||
## contains no data then this function will fail with a ``IndexError``
|
||||
## exception.
|
||||
##
|
||||
## This function will remove the data that was returned from the underlying
|
||||
## ``FutureStream``.
|
||||
return future.queue.dequeue()
|
||||
|
||||
proc put*[T](future: FutureStream[T], value: T) =
|
||||
## Writes the specified value inside the specified future stream.
|
||||
##
|
||||
## This will raise ``ValueError`` if ``future`` is finished.
|
||||
if future.finished:
|
||||
let msg = "FutureStream is finished and so no longer accepts new data."
|
||||
raise newException(ValueError, msg)
|
||||
future.queue.enqueue(value)
|
||||
if not future.cb.isNil: future.cb()
|
||||
|
||||
proc peek*[T](future: FutureStream[T]): T =
|
||||
## Returns the oldest value stored inside the specified future stream.
|
||||
return future.queue.front()
|
||||
|
||||
proc takeAsync*[T](future: FutureStream[T]): Future[T] =
|
||||
## Returns a future that will complete when the ``FutureStream`` has data
|
||||
## placed into it. The future will be completed with the oldest value stored
|
||||
## inside the stream.
|
||||
##
|
||||
## This function will remove the data that was returned from the underlying
|
||||
## ``FutureStream``.
|
||||
var resFut = newFuture[T]("FutureStream.takeAsync")
|
||||
let cb = future.cb
|
||||
future.callback =
|
||||
proc (fs: FutureStream[T]) =
|
||||
# TODO: When finished(fs) should we "cancel" resFut? This assumes that we
|
||||
# TODO: can `complete` with no value.
|
||||
if not resFut.finished and (not finished(fs)):
|
||||
resFut.complete(fs.take())
|
||||
if not cb.isNil: cb()
|
||||
return resFut
|
||||
|
||||
proc asyncCheck*[T](future: Future[T]) =
|
||||
## Sets a callback on ``future`` which raises an exception if the future
|
||||
## finished with an error.
|
||||
|
||||
@@ -4,16 +4,17 @@ var fs = newFutureStream[string]()
|
||||
|
||||
proc alpha() {.async.} =
|
||||
for i in 0 .. 5:
|
||||
fs.put($i)
|
||||
await sleepAsync(1000)
|
||||
fs.put($i)
|
||||
|
||||
fs.complete()
|
||||
fs.complete("Done")
|
||||
|
||||
proc beta() {.async.} =
|
||||
while not fs.finished():
|
||||
while not fs.finished:
|
||||
echo(await fs.takeAsync())
|
||||
|
||||
echo("Finished")
|
||||
|
||||
asyncCheck alpha()
|
||||
asyncCheck beta()
|
||||
waitFor beta()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user