FutureStream's cb call behaviour fixed + other fixes.

This commit is contained in:
Dominik Picheta
2017-02-11 12:42:30 +01:00
parent 4a7ea8f865
commit 77071eb767

View File

@@ -182,7 +182,7 @@ proc `callback=`*[T](future: FutureStream[T],
## If the future stream already has data then ``cb`` will be called
## immediately.
future.cb = proc () = cb(future)
if future.queue.len > 0:
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc injectStacktrace[T](future: Future[T]) =
@@ -257,6 +257,7 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] =
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Buffering.
future.queue.enqueue(value)
if not future.cb.isNil: future.cb()
@@ -294,6 +295,10 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
if not savedCb.isNil: savedCb()
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len
proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.