Fixes #8994. FutureStream read procedure data loss no longer occurs. (#9183)

* Fixes #8994. FutureStream read procedure data loss no longer occurs.

* Optimises the fix for #8994.
This commit is contained in:
Dominik Picheta
2018-10-09 18:39:12 +01:00
committed by Andreas Rumpf
parent 5076fda2e2
commit 21ecf64d24
2 changed files with 24 additions and 2 deletions

View File

@@ -81,6 +81,9 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# Exit early if `resFut` is already complete. (See #8994).
if resFut.finished: return
# We don't want this callback called again.
future.cb = nil
@@ -93,8 +96,7 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
res[0] = true
res[1] = fs.queue.popFirst()
if not resFut.finished:
resFut.complete(res)
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()

View File

@@ -35,6 +35,26 @@ proc beta() {.async.} =
asyncCheck alpha()
waitFor beta()
template ensureCallbacksAreScheduled =
# callbacks are called directly if the dispatcher is not running
discard getGlobalDispatcher()
proc testCompletion() {.async.} =
ensureCallbacksAreScheduled
var stream = newFutureStream[string]()
for i in 1..5:
await stream.write($i)
var readFuture = stream.readAll()
stream.complete()
yield readFuture
let data = readFuture.read()
doAssert(data.len == 5, "actual data len = " & $data.len)
waitFor testCompletion()
# TODO: Something like this should work eventually.
# proc delta(): FutureStream[string] {.async.} =
# for i in 0 .. 5: