Improve implementation of takeAsync for FutureStreams.

This commit is contained in:
Dominik Picheta
2017-02-10 20:18:59 +01:00
parent d87fb236d1
commit ddd3d3f44a
2 changed files with 48 additions and 18 deletions

View File

@@ -129,11 +129,9 @@ proc complete*[T](future: FutureVar[T], val: T) =
if not fut.cb.isNil():
fut.cb()
proc complete*[T](future: FutureStream[T], value: T) =
## Completes a ``FutureStream`` with the last value, signifying the end of
## data.
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signifying the end of data.
future.finished = true
future.queue.enqueue(value)
if not future.cb.isNil():
future.cb()
@@ -274,22 +272,36 @@ proc peek*[T](future: FutureStream[T]): T =
## Returns the oldest value stored inside the specified future stream.
return future.queue.front()
proc takeAsync*[T](future: FutureStream[T]): Future[T] =
proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest value stored
## inside the stream.
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, ``false`` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[T]("FutureStream.takeAsync")
let cb = future.cb
var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# TODO: When finished(fs) should we "cancel" resFut? This assumes that we
# TODO: can `complete` with no value.
if not resFut.finished and (not finished(fs)):
resFut.complete(fs.take())
if not cb.isNil: cb()
# We don't want this callback called again.
future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.take()
if not resFut.finished:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
return resFut
proc asyncCheck*[T](future: Future[T]) =

View File

@@ -14,21 +14,39 @@ Finished
"""
import asyncdispatch
var fs = newFutureStream[string]()
var fs = newFutureStream[int]()
proc alpha() {.async.} =
for i in 0 .. 5:
await sleepAsync(1000)
fs.put($i)
fs.put(i)
fs.complete("Done")
fs.complete()
proc beta() {.async.} =
while not fs.finished:
echo(await fs.takeAsync())
let (hasValue, value) = await fs.takeAsync()
if hasValue:
echo(value)
echo("Finished")
asyncCheck alpha()
waitFor beta()
# TODO: Something like this should work eventually.
# proc delta(): FutureStream[string] {.async.} =
# for i in 0 .. 5:
# await sleepAsync(1000)
# result.put($i)
# return ""
# proc omega() {.async.} =
# let fut = delta()
# while not fut.finished():
# echo(await fs.takeAsync())
# echo("Finished")
# waitFor omega()