Remove immediate FutureStream procs and make 'put' awaitable.

This commit is contained in:
Dominik Picheta
2017-02-10 20:40:32 +01:00
parent ddd3d3f44a
commit 2f502e2a9e
2 changed files with 10 additions and 20 deletions

View File

@@ -249,30 +249,20 @@ proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} =
## Retrieves the oldest value stored inside the stream. If the stream
## contains no data then this function will fail with a ``IndexError``
## exception.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
return future.queue.dequeue()
proc put*[T](future: FutureStream[T], value: T) =
proc put*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise ``ValueError`` if ``future`` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
raise newException(ValueError, msg)
result.fail(newException(ValueError, msg))
# TODO: Buffering.
future.queue.enqueue(value)
if not future.cb.isNil: future.cb()
result.complete()
proc peek*[T](future: FutureStream[T]): T =
## Returns the oldest value stored inside the specified future stream.
return future.queue.front()
proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
@@ -281,7 +271,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
@@ -295,7 +285,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
res[0] = false
else:
res[0] = true
res[1] = fs.take()
res[1] = fs.queue.dequeue()
if not resFut.finished:
resFut.complete(res)

View File

@@ -19,13 +19,13 @@ var fs = newFutureStream[int]()
proc alpha() {.async.} =
for i in 0 .. 5:
await sleepAsync(1000)
fs.put(i)
await fs.put(i)
fs.complete()
proc beta() {.async.} =
while not fs.finished:
let (hasValue, value) = await fs.takeAsync()
let (hasValue, value) = await fs.take()
if hasValue:
echo(value)