From e189004a03fc10a5330caf853f7a287636f679fd Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Thu, 9 Feb 2017 22:46:20 +0100 Subject: [PATCH 01/19] WIP implementation of FutureStream. --- lib/pure/includes/asyncfutures.nim | 92 +++++++++++++++++++++++++++--- tests/async/tfuturestream.nim | 19 ++++++ 2 files changed, 103 insertions(+), 8 deletions(-) create mode 100644 tests/async/tfuturestream.nim diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index c83228014f..a41f03a00b 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -1,3 +1,4 @@ +import queues # TODO: This shouldn't need to be included, but should ideally be exported. type @@ -16,6 +17,10 @@ type FutureVar*[T] = distinct Future[T] + FutureStream*[T] = ref object of FutureBase ## Special future that acts as + ## a queue. + queue: Queue[T] + FutureError* = object of Exception cause*: FutureBase @@ -26,11 +31,7 @@ when not defined(release): proc callSoon*(cbproc: proc ()) {.gcsafe.} -proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = - ## Creates a new future. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. +template setupFutureBase(fromProc: string): stmt = new(result) result.finished = false when not defined(release): @@ -39,6 +40,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = result.fromProc = fromProc currentID.inc() +proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = + ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## Create a new ``FutureVar``. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. @@ -47,6 +55,15 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) +proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = + ## Create a new ``FutureStream``. This Future type's callback can be activated + ## multiple times when new data is written to it. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + result.queue = initQueue[T]() + proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false @@ -107,12 +124,18 @@ proc complete*[T](future: FutureVar[T], val: T) = ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) checkFinished(fut) - assert(fut.error == nil) + assert(fut.error.isNil()) fut.finished = true fut.value = val - if fut.cb != nil: + if not fut.cb.isNil(): fut.cb() +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` to signify the end of data. + future.finished = true + if not future.cb.isNil(): + future.cb() + proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -149,6 +172,20 @@ proc `callback=`*[T](future: Future[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) +proc `callback=`*[T](future: FutureStream[T], + cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = + ## Sets the callback proc to be called when data was placed inside the + ## future stream. + ## + ## The callback is also called when the future is completed. So you should + ## use ``finished`` to check whether data is available. + ## + ## If the future stream already has data then ``cb`` will be called + ## immediately. + future.cb = proc () = cb(future) + if future.queue.len > 0: + callSoon(future.cb) + proc injectStacktrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): @@ -179,6 +216,42 @@ proc read*[T](future: Future[T] | FutureVar[T]): T = # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") +proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} = + ## Retrieves the oldest value stored inside the stream. If the stream + ## contains no data then this function will fail with a ``ValueError`` + ## exception. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + return future.queue.dequeue() + +proc put*[T](future: FutureStream[T], value: T): T = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + raise newException(ValueError, msg) + future.queue.enqueue(value) + +proc peek*[T](future: FutureStream[T]): T = + ## Returns the oldest value stored inside the specified future stream. + return future.queue.front() + +proc takeAsync*[T](future: FutureStream[T]): Future[T] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest value stored + ## inside the stream. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[T]("FutureStream.wait") + let cb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + resFut.complete(fs.take()) + if not cb.isNil: cb() + proc readError*[T](future: Future[T]): ref Exception = ## Retrieves the exception stored in ``future``. ## @@ -195,10 +268,13 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc finished*[T](future: Future[T] | FutureVar[T]): bool = +proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## + ## For a ``FutureStream`` this signifies that no more data will be placed + ## inside it. when future is FutureVar[T]: result = (Future[T](future)).finished else: diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim new file mode 100644 index 0000000000..ed5ac57859 --- /dev/null +++ b/tests/async/tfuturestream.nim @@ -0,0 +1,19 @@ +import asyncdispatch + +var fs = newFutureStream[string]() + +proc alpha() {.async.} = + for i in 0 .. 5: + fs.put($i) + await sleepAsync(1000) + + fs.complete() + +proc beta() {.async.} = + while not fs.finished(): + echo(await fs.takeAsync()) + + echo("Finished") + +asyncCheck alpha() +asyncCheck beta() \ No newline at end of file From 32864809856352ad9b537fdb7bc8e936cfdc99ee Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Thu, 9 Feb 2017 23:07:10 +0100 Subject: [PATCH 02/19] Improve error message when a `nil` future is await'ed. --- lib/pure/asyncmacro.nim | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/pure/asyncmacro.nim b/lib/pure/asyncmacro.nim index f74881c6d0..ba9993b2c2 100644 --- a/lib/pure/asyncmacro.nim +++ b/lib/pure/asyncmacro.nim @@ -33,8 +33,10 @@ template createCb(retFutureSym, iteratorNameSym, if not nameIterVar.finished: var next = nameIterVar() if next == nil: - assert retFutureSym.finished, "Async procedure's (" & - name & ") return Future was not finished." + if not retFutureSym.finished: + let msg = "Async procedure ($1) yielded `nil`, are you await'ing a " & + "`nil` Future?" + raise newException(AssertionError, msg % name) else: next.callback = cb except: @@ -390,7 +392,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = if procBody.kind != nnkEmpty: result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "testInfix": + #if prc[0].getName == "beta": # echo(toStrLit(result)) macro async*(prc: untyped): untyped = From c4d5cc652f9207d3c30f0746babaa5186e845ebb Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 10 Feb 2017 00:05:42 +0100 Subject: [PATCH 03/19] Work around issue with queues. Refs #4773. --- lib/pure/collections/queues.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pure/collections/queues.nim b/lib/pure/collections/queues.nim index 0490ae4943..4014221622 100644 --- a/lib/pure/collections/queues.nim +++ b/lib/pure/collections/queues.nim @@ -144,7 +144,7 @@ proc add*[T](q: var Queue[T], item: T) = var cap = q.mask+1 if unlikely(q.count >= cap): var n = newSeq[T](cap*2) - for i, x in q: # don't use copyMem because the GC and because it's slower. + for i, x in pairs(q): # don't use copyMem because the GC and because it's slower. shallowCopy(n[i], x) shallowCopy(q.data, n) q.mask = cap*2 - 1 From 7766fdfec1993129da4a84a93c1c09aadfc9a6d6 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 10 Feb 2017 00:06:18 +0100 Subject: [PATCH 04/19] Implemented a first working version of FutureStreams. --- lib/pure/asyncdispatch.nim | 2 +- lib/pure/includes/asyncfutures.nim | 88 ++++++++++++++++-------------- tests/async/tfuturestream.nim | 9 +-- 3 files changed, 54 insertions(+), 45 deletions(-) diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d97214d151..58113ae699 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue +import os, oids, tables, strutils, times, heapqueue, queues import nativesockets, net, deques diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index a41f03a00b..8d7ace7a17 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -1,4 +1,3 @@ -import queues # TODO: This shouldn't need to be included, but should ideally be exported. type @@ -130,9 +129,11 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` to signify the end of data. +proc complete*[T](future: FutureStream[T], value: T) = + ## Completes a ``FutureStream`` with the last value, signifying the end of + ## data. future.finished = true + future.queue.enqueue(value) if not future.cb.isNil(): future.cb() @@ -216,42 +217,6 @@ proc read*[T](future: Future[T] | FutureVar[T]): T = # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") -proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} = - ## Retrieves the oldest value stored inside the stream. If the stream - ## contains no data then this function will fail with a ``ValueError`` - ## exception. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - return future.queue.dequeue() - -proc put*[T](future: FutureStream[T], value: T): T = - ## Writes the specified value inside the specified future stream. - ## - ## This will raise ``ValueError`` if ``future`` is finished. - if future.finished: - let msg = "FutureStream is finished and so no longer accepts new data." - raise newException(ValueError, msg) - future.queue.enqueue(value) - -proc peek*[T](future: FutureStream[T]): T = - ## Returns the oldest value stored inside the specified future stream. - return future.queue.front() - -proc takeAsync*[T](future: FutureStream[T]): Future[T] = - ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest value stored - ## inside the stream. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - var resFut = newFuture[T]("FutureStream.wait") - let cb = future.cb - future.callback = - proc (fs: FutureStream[T]) = - resFut.complete(fs.take()) - if not cb.isNil: cb() - proc readError*[T](future: Future[T]): ref Exception = ## Retrieves the exception stored in ``future``. ## @@ -274,9 +239,11 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. ## ## For a ``FutureStream`` this signifies that no more data will be placed - ## inside it. + ## inside it and that there is no data waiting to be retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished + elif future is FutureStream[T]: + result = future.finished and future.queue.len == 0 else: result = future.finished @@ -284,6 +251,47 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil +proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} = + ## Retrieves the oldest value stored inside the stream. If the stream + ## contains no data then this function will fail with a ``IndexError`` + ## exception. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + return future.queue.dequeue() + +proc put*[T](future: FutureStream[T], value: T) = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + raise newException(ValueError, msg) + future.queue.enqueue(value) + if not future.cb.isNil: future.cb() + +proc peek*[T](future: FutureStream[T]): T = + ## Returns the oldest value stored inside the specified future stream. + return future.queue.front() + +proc takeAsync*[T](future: FutureStream[T]): Future[T] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest value stored + ## inside the stream. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[T]("FutureStream.takeAsync") + let cb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + # TODO: When finished(fs) should we "cancel" resFut? This assumes that we + # TODO: can `complete` with no value. + if not resFut.finished and (not finished(fs)): + resFut.complete(fs.take()) + if not cb.isNil: cb() + return resFut + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index ed5ac57859..35dde2f9de 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -4,16 +4,17 @@ var fs = newFutureStream[string]() proc alpha() {.async.} = for i in 0 .. 5: - fs.put($i) await sleepAsync(1000) + fs.put($i) - fs.complete() + fs.complete("Done") proc beta() {.async.} = - while not fs.finished(): + while not fs.finished: echo(await fs.takeAsync()) echo("Finished") asyncCheck alpha() -asyncCheck beta() \ No newline at end of file +waitFor beta() + From d87fb236d101b9c1bc14f18fe17798cc214a9620 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 10 Feb 2017 18:57:43 +0100 Subject: [PATCH 05/19] Add test spec to tfuturestream --- tests/async/tfuturestream.nim | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index 35dde2f9de..e3480126f6 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -1,3 +1,17 @@ +discard """ + file: "tfuturestream.nim" + exitcode: 0 + output: ''' +0 +1 +2 +3 +4 +5 +Done +Finished +''' +""" import asyncdispatch var fs = newFutureStream[string]() From ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 10 Feb 2017 20:18:59 +0100 Subject: [PATCH 06/19] Improve implementation of takeAsync for FutureStreams. --- lib/pure/includes/asyncfutures.nim | 40 +++++++++++++++++++----------- tests/async/tfuturestream.nim | 26 ++++++++++++++++--- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 8d7ace7a17..13d927e92b 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -129,11 +129,9 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T], value: T) = - ## Completes a ``FutureStream`` with the last value, signifying the end of - ## data. +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signifying the end of data. future.finished = true - future.queue.enqueue(value) if not future.cb.isNil(): future.cb() @@ -274,22 +272,36 @@ proc peek*[T](future: FutureStream[T]): T = ## Returns the oldest value stored inside the specified future stream. return future.queue.front() -proc takeAsync*[T](future: FutureStream[T]): Future[T] = +proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest value stored - ## inside the stream. + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. ## ## This function will remove the data that was returned from the underlying ## ``FutureStream``. - var resFut = newFuture[T]("FutureStream.takeAsync") - let cb = future.cb + var resFut = newFuture[(bool, T)]("FutureStream.takeAsync") + let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = - # TODO: When finished(fs) should we "cancel" resFut? This assumes that we - # TODO: can `complete` with no value. - if not resFut.finished and (not finished(fs)): - resFut.complete(fs.take()) - if not cb.isNil: cb() + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.take() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() return resFut proc asyncCheck*[T](future: Future[T]) = diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index e3480126f6..61b3863ac3 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -14,21 +14,39 @@ Finished """ import asyncdispatch -var fs = newFutureStream[string]() +var fs = newFutureStream[int]() proc alpha() {.async.} = for i in 0 .. 5: await sleepAsync(1000) - fs.put($i) + fs.put(i) - fs.complete("Done") + fs.complete() proc beta() {.async.} = while not fs.finished: - echo(await fs.takeAsync()) + let (hasValue, value) = await fs.takeAsync() + if hasValue: + echo(value) echo("Finished") asyncCheck alpha() waitFor beta() +# TODO: Something like this should work eventually. +# proc delta(): FutureStream[string] {.async.} = +# for i in 0 .. 5: +# await sleepAsync(1000) +# result.put($i) + +# return "" + +# proc omega() {.async.} = +# let fut = delta() +# while not fut.finished(): +# echo(await fs.takeAsync()) + +# echo("Finished") + +# waitFor omega() \ No newline at end of file From 2f502e2a9ede75be3f56a0206e1314c758e1ad90 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 10 Feb 2017 20:40:32 +0100 Subject: [PATCH 07/19] Remove immediate FutureStream procs and make 'put' awaitable. --- lib/pure/includes/asyncfutures.nim | 26 ++++++++------------------ tests/async/tfuturestream.nim | 4 ++-- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 13d927e92b..eb0c6bbf2e 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -249,30 +249,20 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil -proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} = - ## Retrieves the oldest value stored inside the stream. If the stream - ## contains no data then this function will fail with a ``IndexError`` - ## exception. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - return future.queue.dequeue() - -proc put*[T](future: FutureStream[T], value: T) = +proc put*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## ## This will raise ``ValueError`` if ``future`` is finished. + result = newFuture[void]("FutureStream.put") if future.finished: let msg = "FutureStream is finished and so no longer accepts new data." - raise newException(ValueError, msg) + result.fail(newException(ValueError, msg)) + # TODO: Buffering. future.queue.enqueue(value) if not future.cb.isNil: future.cb() + result.complete() -proc peek*[T](future: FutureStream[T]): T = - ## Returns the oldest value stored inside the specified future stream. - return future.queue.front() - -proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = +proc take*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data ## placed into it. The future will be completed with the oldest ## value stored inside the stream. The return value will also determine @@ -281,7 +271,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = ## ## This function will remove the data that was returned from the underlying ## ``FutureStream``. - var resFut = newFuture[(bool, T)]("FutureStream.takeAsync") + var resFut = newFuture[(bool, T)]("FutureStream.take") let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = @@ -295,7 +285,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = false else: res[0] = true - res[1] = fs.take() + res[1] = fs.queue.dequeue() if not resFut.finished: resFut.complete(res) diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index 61b3863ac3..bf8c9b4c43 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -19,13 +19,13 @@ var fs = newFutureStream[int]() proc alpha() {.async.} = for i in 0 .. 5: await sleepAsync(1000) - fs.put(i) + await fs.put(i) fs.complete() proc beta() {.async.} = while not fs.finished: - let (hasValue, value) = await fs.takeAsync() + let (hasValue, value) = await fs.take() if hasValue: echo(value) From 4a7ea8f8650d7168c7bfaed725125d4a9a3920a0 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 11 Feb 2017 12:39:37 +0100 Subject: [PATCH 08/19] Add support for `Async | Sync` return types in {.multisync.} macro. --- lib/pure/asyncmacro.nim | 55 +++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/lib/pure/asyncmacro.nim b/lib/pure/asyncmacro.nim index ba9993b2c2..f0837d67d5 100644 --- a/lib/pure/asyncmacro.nim +++ b/lib/pure/asyncmacro.nim @@ -283,6 +283,14 @@ proc getFutureVarIdents(params: NimNode): seq[NimNode] {.compileTime.} = ($params[i][1][0].ident).normalize == "futurevar": result.add(params[i][0]) +proc isInvalidReturnType(typeName: string): bool = + return typeName notin ["Future"] #, "FutureStream"] + +proc verifyReturnType(typeName: string) {.compileTime.} = + if typeName.isInvalidReturnType: + error("Expected return type of 'Future' got '$1'" % + typeName) + proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = ## This macro transforms a single procedure into a closure iterator. ## The ``async`` macro supports a stmtList holding multiple async procedures. @@ -297,18 +305,16 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = # Verify that the return type is a Future[T] if returnType.kind == nnkBracketExpr: let fut = repr(returnType[0]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") + verifyReturnType(fut) baseType = returnType[1] elif returnType.kind in nnkCallKinds and $returnType[0] == "[]": let fut = repr(returnType[1]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") + verifyReturnType(fut) baseType = returnType[2] elif returnType.kind == nnkEmpty: baseType = returnType else: - error("Expected return type of 'Future' got '" & repr(returnType) & "'") + verifyReturnType(repr(returnType)) let subtypeIsVoid = returnType.kind == nnkEmpty or (baseType.kind == nnkIdent and returnType[1].ident == !"void") @@ -453,13 +459,12 @@ proc stripAwait(node: NimNode): NimNode = for i in 0 .. FormalParams (3) -> IdentDefs, the parameter (i) -> + # parameter type (1). + result[0][3][i][1] = splitParamType(result[0][3][i][1], async=false) result[0][6] = stripAwait(result[0][6]) result[1] = prc.copyNimTree() + if result[1][3][0].kind == nnkBracketExpr: + result[1][3][0][1] = splitParamType(result[1][3][0][1], async=true) for i in 1 .. FormalParams (3) -> IdentDefs, the parameter (i) -> + # parameter type (1). + result[1][3][i][1] = splitParamType(result[1][3][i][1], async=true) macro multisync*(prc: untyped): untyped = ## Macro which processes async procedures into both asynchronous and @@ -514,4 +527,4 @@ macro multisync*(prc: untyped): untyped = let (sync, asyncPrc) = splitProc(prc) result = newStmtList() result.add(asyncSingleProc(asyncPrc)) - result.add(sync) + result.add(sync) \ No newline at end of file From 77071eb767dabc78ea23c0ea623331acac640694 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 11 Feb 2017 12:42:30 +0100 Subject: [PATCH 09/19] FutureStream's cb call behaviour fixed + other fixes. --- lib/pure/includes/asyncfutures.nim | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index eb0c6bbf2e..6f6693605e 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -182,7 +182,7 @@ proc `callback=`*[T](future: FutureStream[T], ## If the future stream already has data then ``cb`` will be called ## immediately. future.cb = proc () = cb(future) - if future.queue.len > 0: + if future.queue.len > 0 or future.finished: callSoon(future.cb) proc injectStacktrace[T](future: Future[T]) = @@ -257,6 +257,7 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] = if future.finished: let msg = "FutureStream is finished and so no longer accepts new data." result.fail(newException(ValueError, msg)) + return # TODO: Buffering. future.queue.enqueue(value) if not future.cb.isNil: future.cb() @@ -294,6 +295,10 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] = if not savedCb.isNil: savedCb() return resFut +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. From 1b4067a81b627ee7f1aeec4d29cd70756be57a5f Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 11 Feb 2017 12:43:16 +0100 Subject: [PATCH 10/19] Implement streamed body reading in httpclient. --- lib/pure/httpclient.nim | 150 +++++++++++++++++++++++++---------- tests/stdlib/thttpclient.nim | 4 +- 2 files changed, 111 insertions(+), 43 deletions(-) diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 1ded540ecf..024643384c 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -117,20 +117,28 @@ ## only basic authentication is supported at the moment. import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes, - math, random, httpcore, times, tables + math, random, httpcore, times, tables, streams import asyncnet, asyncdispatch import nativesockets export httpcore except parseHeader # TODO: The ``except`` doesn't work type - Response* = object + Response* = ref object version*: string status*: string headers*: HttpHeaders - body*: string + body: string # TODO: here for compatibility with old httpclient procs. + bodyStream*: StringStream -proc code*(response: Response): HttpCode + AsyncResponse* = ref object + version*: string + status*: string + headers*: HttpHeaders + body: string + bodyStream*: FutureStream[string] + +proc code*(response: Response | AsyncResponse): HttpCode {.raises: [ValueError, OverflowError].} = ## Retrieves the specified response's ``HttpCode``. ## @@ -138,6 +146,40 @@ proc code*(response: Response): HttpCode ## corresponding ``HttpCode``. return response.status[0 .. 2].parseInt.HttpCode +proc body*(response: Response): string = + ## Retrieves the specified response's body. + ## + ## The response's body stream is read synchronously. + if response.body.isNil(): + response.body = response.bodyStream.readAll() + return response.body + +proc `body=`*(response: Response, value: string) {.deprecated.} = + ## Setter for backward compatibility. + ## + ## **This is deprecated and should not be used**. + response.body = value + +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + + # TODO: Move this to asyncfutures. + result = "" + while true: + let (hasValue, value) = await future.take() + if hasValue: + result.add(value) + else: + break + +proc body*(response: AsyncResponse): Future[string] {.async.} = + ## Reads the response's body and caches it. The read is performed only + ## once. + if response.body.isNil: + response.body = await readAll(response.bodyStream) + return response.body + type Proxy* = ref object url*: Uri @@ -249,6 +291,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in result.add(buf) proc parseResponse(s: Socket, getBody: bool, timeout: int): Response = + new result var parsedStatus = false var linei = 0 var fullyRead = false @@ -735,6 +778,10 @@ type contentProgress: BiggestInt oneSecondProgress: BiggestInt lastProgressReport: float + when SocketType is AsyncSocket: + bodyStream: FutureStream[string] + else: + bodyStream: StringStream type HttpClient* = HttpClientBase[Socket] @@ -764,6 +811,7 @@ proc newHttpClient*(userAgent = defUserAgent, result.proxy = proxy result.timeout = timeout result.onProgressChanged = nil + result.bodyStream = newStringStream() when defined(ssl): result.sslContext = sslContext @@ -794,6 +842,7 @@ proc newAsyncHttpClient*(userAgent = defUserAgent, result.proxy = proxy result.timeout = -1 # TODO result.onProgressChanged = nil + result.bodyStream = newFutureStream[string]("newAsyncHttpClient") when defined(ssl): result.sslContext = sslContext @@ -815,14 +864,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient, client.oneSecondProgress = 0 client.lastProgressReport = epochTime() -proc recvFull(client: HttpClient | AsyncHttpClient, - size: int, timeout: int): Future[string] {.multisync.} = +proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int, + keep: bool): Future[int] {.multisync.} = ## Ensures that all the data requested is read and returned. - result = "" + var readLen = 0 while true: - if size == result.len: break + if size == readLen: break - let remainingSize = size - result.len + let remainingSize = size - readLen let sizeToRecv = min(remainingSize, net.BufferSize) when client.socket is Socket: @@ -830,13 +879,20 @@ proc recvFull(client: HttpClient | AsyncHttpClient, else: let data = await client.socket.recv(sizeToRecv) if data == "": break # We've been disconnected. - result.add data + + readLen.inc(data.len) + if keep: + when client.socket is Socket: + client.bodyStream.write(data) + else: + await client.bodyStream.put(data) await reportProgress(client, data.len) -proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string] + return readLen + +proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void] {.multisync.} = - result = "" while true: var chunkSize = 0 var chunkSizeStr = await client.socket.recvLine() @@ -861,25 +917,29 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string] httpError("Invalid chunk size: " & chunkSizeStr) inc(i) if chunkSize <= 0: - discard await recvFull(client, 2, client.timeout) # Skip \c\L + discard await recvFull(client, 2, client.timeout, false) # Skip \c\L break - result.add await recvFull(client, chunkSize, client.timeout) - discard await recvFull(client, 2, client.timeout) # Skip \c\L + discard await recvFull(client, chunkSize, client.timeout, true) + discard await recvFull(client, 2, client.timeout, false) # Skip \c\L # Trailer headers will only be sent if the request specifies that we want # them: http://tools.ietf.org/html/rfc2616#section-3.6.1 proc parseBody(client: HttpClient | AsyncHttpClient, headers: HttpHeaders, - httpVersion: string): Future[string] {.multisync.} = - result = "" + httpVersion: string): Future[void] {.multisync.} = # Reset progress from previous requests. client.contentTotal = 0 client.contentProgress = 0 client.oneSecondProgress = 0 client.lastProgressReport = 0 + when client is HttpClient: + client.bodyStream = newStringStream() + else: + client.bodyStream = newFutureStream[string]("parseResponse") + if headers.getOrDefault"Transfer-Encoding" == "chunked": - result = await parseChunks(client) + await parseChunks(client) else: # -REGION- Content-Length # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3 @@ -888,26 +948,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient, var length = contentLengthHeader.parseint() client.contentTotal = length if length > 0: - result = await client.recvFull(length, client.timeout) - if result == "": + let recvLen = await client.recvFull(length, client.timeout, true) + if recvLen == 0: httpError("Got disconnected while trying to read body.") - if result.len != length: + if recvLen != length: httpError("Received length doesn't match expected length. Wanted " & - $length & " got " & $result.len) + $length & " got " & $recvLen) else: # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO # -REGION- Connection: Close # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5 if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0": - var buf = "" while true: - buf = await client.recvFull(4000, client.timeout) - if buf == "": break - result.add(buf) + let recvLen = await client.recvFull(4000, client.timeout, true) + if recvLen == 0: break + + when client is AsyncHttpClient: + client.bodyStream.complete() + else: + client.bodyStream.setPosition(0) proc parseResponse(client: HttpClient | AsyncHttpClient, - getBody: bool): Future[Response] {.multisync.} = + getBody: bool): Future[Response | AsyncResponse] + {.multisync.} = + new result var parsedStatus = false var linei = 0 var fullyRead = false @@ -956,9 +1021,8 @@ proc parseResponse(client: HttpClient | AsyncHttpClient, if not fullyRead: httpError("Connection was closed before full request has been made") if getBody: - result.body = await parseBody(client, result.headers, result.version) - else: - result.body = "" + await parseBody(client, result.headers, result.version) + result.bodyStream = client.bodyStream proc newConnection(client: HttpClient | AsyncHttpClient, url: Uri) {.multisync.} = @@ -1006,8 +1070,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders = result[k] = vs proc requestAux(client: HttpClient | AsyncHttpClient, url: string, - httpMethod: string, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + httpMethod: string, body = "", + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = # Helper that actually makes the request. Does not handle redirects. let connectionUrl = if client.proxy.isNil: parseUri(url) else: client.proxy.url @@ -1053,10 +1118,10 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string, # Restore the clients proxy in case it was overwritten. client.proxy = savedProxy - proc request*(client: HttpClient | AsyncHttpClient, url: string, httpMethod: string, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a request ## using the custom method string specified by ``httpMethod``. ## @@ -1078,7 +1143,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string, proc request*(client: HttpClient | AsyncHttpClient, url: string, httpMethod = HttpGET, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a request ## using the method specified. ## @@ -1088,11 +1154,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string, ## ## When a request is made to a different hostname, the current connection will ## be closed. - result = await request(client, url, $httpMethod, body, - headers = headers) + result = await request(client, url, $httpMethod, body, headers) proc get*(client: HttpClient | AsyncHttpClient, - url: string): Future[Response] {.multisync.} = + url: string): Future[Response | AsyncResponse] {.multisync.} = ## Connects to the hostname specified by the URL and performs a GET request. ## ## This procedure will follow redirects up to a maximum number of redirects @@ -1112,16 +1177,17 @@ proc getContent*(client: HttpClient | AsyncHttpClient, if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) else: - return resp.body + return await resp.bodyStream.readAll() proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "", - multipart: MultipartData = nil): Future[Response] {.multisync.} = + multipart: MultipartData = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a POST request. ## ## This procedure will follow redirects up to a maximum number of redirects ## specified in ``client.maxRedirects``. let (mpHeader, mpBody) = format(multipart) - + # TODO: Support FutureStream for `body` parameter. template withNewLine(x): expr = if x.len > 0 and not x.endsWith("\c\L"): x & "\c\L" @@ -1161,4 +1227,4 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string, if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) else: - return resp.body + return await resp.bodyStream.readAll() diff --git a/tests/stdlib/thttpclient.nim b/tests/stdlib/thttpclient.nim index 7b1111f9b0..c5739f0e18 100644 --- a/tests/stdlib/thttpclient.nim +++ b/tests/stdlib/thttpclient.nim @@ -13,7 +13,9 @@ proc asyncTest() {.async.} = var client = newAsyncHttpClient() var resp = await client.request("http://example.com/") doAssert(resp.code.is2xx) - doAssert("Example Domain" in resp.body) + var body = await resp.body + body = await resp.body # Test caching + doAssert("Example Domain" in body) resp = await client.request("http://example.com/404") doAssert(resp.code.is4xx) From f9f86899b5062e1f3db5ffbc156782b688d18ea2 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 11 Feb 2017 14:00:53 +0100 Subject: [PATCH 11/19] Implement streamed async/sync downloadFile and deprecate old one. --- lib/pure/asyncfile.nim | 12 ++++++++ lib/pure/httpclient.nim | 60 +++++++++++++++++++++++++++--------- tests/stdlib/thttpclient.nim | 6 ++-- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index 0241e47960..5a23f3ba21 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -476,3 +476,15 @@ proc close*(f: AsyncFile) = if close(f.fd.cint) == -1: raiseOSError(osLastError()) +proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} = + while true: + let (hasValue, value) = await fut.take() + if hasValue: + await f.write(value) + else: + break + +proc getWriteStream*(f: AsyncFile): FutureStream[string] = + ## Returns a new stream that can be used for writing to the file. + result = newFutureStream[string]() + asyncCheck writeFromStream(f, result) diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 024643384c..4f26c078ae 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -118,7 +118,7 @@ import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes, math, random, httpcore, times, tables, streams -import asyncnet, asyncdispatch +import asyncnet, asyncdispatch, asyncfile import nativesockets export httpcore except parseHeader # TODO: The ``except`` doesn't work @@ -129,7 +129,7 @@ type status*: string headers*: HttpHeaders body: string # TODO: here for compatibility with old httpclient procs. - bodyStream*: StringStream + bodyStream*: Stream AsyncResponse* = ref object version*: string @@ -696,10 +696,13 @@ proc postContent*(url: string, extraHeaders = "", body = "", proc downloadFile*(url: string, outputFilename: string, sslContext: SSLContext = defaultSSLContext, timeout = -1, userAgent = defUserAgent, - proxy: Proxy = nil) = + proxy: Proxy = nil) {.deprecated.} = ## | Downloads ``url`` and saves it to ``outputFilename`` ## | An optional timeout can be specified in milliseconds, if reading from the ## server takes longer than specified an ETimeout exception will be raised. + ## + ## **Deprecated since version 0.16.2**: use ``HttpClient.downloadFile`` + ## instead. var f: File if open(f, outputFilename, fmWrite): f.write(getContent(url, sslContext = sslContext, timeout = timeout, @@ -781,7 +784,8 @@ type when SocketType is AsyncSocket: bodyStream: FutureStream[string] else: - bodyStream: StringStream + bodyStream: Stream + getBody: bool ## When `false`, the body is never read in requestAux. type HttpClient* = HttpClientBase[Socket] @@ -812,6 +816,7 @@ proc newHttpClient*(userAgent = defUserAgent, result.timeout = timeout result.onProgressChanged = nil result.bodyStream = newStringStream() + result.getBody = true when defined(ssl): result.sslContext = sslContext @@ -843,6 +848,7 @@ proc newAsyncHttpClient*(userAgent = defUserAgent, result.timeout = -1 # TODO result.onProgressChanged = nil result.bodyStream = newFutureStream[string]("newAsyncHttpClient") + result.getBody = true when defined(ssl): result.sslContext = sslContext @@ -933,10 +939,8 @@ proc parseBody(client: HttpClient | AsyncHttpClient, client.oneSecondProgress = 0 client.lastProgressReport = 0 - when client is HttpClient: - client.bodyStream = newStringStream() - else: - client.bodyStream = newFutureStream[string]("parseResponse") + when client is AsyncHttpClient: + assert(not client.bodyStream.finished) if headers.getOrDefault"Transfer-Encoding" == "chunked": await parseChunks(client) @@ -1020,7 +1024,12 @@ proc parseResponse(client: HttpClient | AsyncHttpClient, if not fullyRead: httpError("Connection was closed before full request has been made") + if getBody: + when client is HttpClient: + client.bodyStream = newStringStream() + else: + client.bodyStream = newFutureStream[string]("parseResponse") await parseBody(client, result.headers, result.version) result.bodyStream = client.bodyStream @@ -1112,8 +1121,9 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string, if body != "": await client.socket.send(body) - result = await parseResponse(client, - httpMethod.toLower() notin ["head", "connect"]) + let getBody = httpMethod.toLowerAscii() notin ["head", "connect"] and + client.getBody + result = await parseResponse(client, getBody) # Restore the clients proxy in case it was overwritten. client.proxy = savedProxy @@ -1200,16 +1210,14 @@ proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "", headers["Content-Type"] = mpHeader.split(": ")[1] headers["Content-Length"] = $len(xb) - result = await client.requestAux(url, $HttpPOST, xb, - headers = headers) + result = await client.requestAux(url, $HttpPOST, xb, headers) # Handle redirects. var lastURL = url for i in 1..client.maxRedirects: if result.status.redirection(): let redirectTo = getNewLocation(lastURL, result.headers) var meth = if result.status != "307": HttpGet else: HttpPost - result = await client.requestAux(redirectTo, $meth, xb, - headers = headers) + result = await client.requestAux(redirectTo, $meth, xb, headers) lastURL = redirectTo proc postContent*(client: HttpClient | AsyncHttpClient, url: string, @@ -1228,3 +1236,27 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string, raise newException(HttpRequestError, resp.status) else: return await resp.bodyStream.readAll() + +proc downloadFile*(client: HttpClient | AsyncHttpClient, + url: string, filename: string): Future[void] {.multisync.} = + ## Downloads ``url`` and saves it to ``filename``. + client.getBody = false + let resp = await client.get(url) + + when client is HttpClient: + client.bodyStream = newFileStream(filename, fmWrite) + if client.bodyStream.isNil: + fileError("Unable to open file") + else: + var f = openAsync(filename, fmWrite) + client.bodyStream = f.getWriteStream() + + await parseBody(client, resp.headers, resp.version) + + when client is HttpClient: + client.bodyStream.close() + else: + f.close() + + if resp.code.is4xx or resp.code.is5xx: + raise newException(HttpRequestError, resp.status) \ No newline at end of file diff --git a/tests/stdlib/thttpclient.nim b/tests/stdlib/thttpclient.nim index c5739f0e18..62c1ebee78 100644 --- a/tests/stdlib/thttpclient.nim +++ b/tests/stdlib/thttpclient.nim @@ -49,7 +49,8 @@ proc asyncTest() {.async.} = echo("Downloaded ", progress, " of ", total) echo("Current rate: ", speed div 1000, "kb/s") client.onProgressChanged = onProgressChanged - discard await client.getContent("http://speedtest-ams2.digitalocean.com/100mb.test") + await client.downloadFile("http://speedtest-ams2.digitalocean.com/100mb.test", + "100mb.test") client.close() @@ -96,7 +97,8 @@ proc syncTest() = echo("Downloaded ", progress, " of ", total) echo("Current rate: ", speed div 1000, "kb/s") client.onProgressChanged = onProgressChanged - discard client.getContent("http://speedtest-ams2.digitalocean.com/100mb.test") + client.downloadFile("http://speedtest-ams2.digitalocean.com/100mb.test", + "100mb.test") client.close() From b053ded266db3b0e1c257a20bf78380749892e37 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 11 Feb 2017 16:49:02 +0100 Subject: [PATCH 12/19] Fixes tests. --- lib/upcoming/asyncdispatch.nim | 2 +- tests/async/tfuturestream.nim | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index d384cd05e0..bdc04fdeb9 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue, lists +import os, oids, tables, strutils, times, heapqueue, lists, queues import nativesockets, net, deques diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index bf8c9b4c43..ce5cfec744 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -21,6 +21,7 @@ proc alpha() {.async.} = await sleepAsync(1000) await fs.put(i) + echo("Done") fs.complete() proc beta() {.async.} = From 667acb06a53a47f47dde29c381df0d4bcbf61b94 Mon Sep 17 00:00:00 2001 From: Araq Date: Sun, 12 Feb 2017 19:39:23 +0100 Subject: [PATCH 13/19] downloader.nim: use new downloadFile proc --- tools/downloader.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tools/downloader.nim b/tools/downloader.nim index 48331e97a0..3d866f5d73 100644 --- a/tools/downloader.nim +++ b/tools/downloader.nim @@ -36,10 +36,7 @@ proc download(pkg: string; c: Controls) {.async.} = c.bar.value = clamp(int(progress*100 div total), 0, 100) client.onProgressChanged = onProgressChanged - # XXX give a destination filename instead - let contents = await client.getContent("https://nim-lang.org/download/" & pkg & ".7z") - # XXX make this async somehow: - writeFile(z, contents) + await client.downloadFile("https://nim-lang.org/download/" & pkg & ".7z", z) c.bar.value = 100 let p = osproc.startProcess("7zG.exe", getCurrentDir() / r"..\dist", ["x", pkg & ".7z"]) From 45765601e051fa26568b1897ea4a0ffb9bb45145 Mon Sep 17 00:00:00 2001 From: Araq Date: Sat, 25 Feb 2017 17:26:03 +0100 Subject: [PATCH 14/19] fixed premature finishing of httpclient.downloadFile --- lib/pure/asyncfile.nim | 6 ++---- lib/pure/httpclient.nim | 14 ++++++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index 5a23f3ba21..adfe6edba6 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -484,7 +484,5 @@ proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} = else: break -proc getWriteStream*(f: AsyncFile): FutureStream[string] = - ## Returns a new stream that can be used for writing to the file. - result = newFutureStream[string]() - asyncCheck writeFromStream(f, result) +proc setWriteStream*(f: AsyncFile; fut: FutureStream[string]) {.async.} = + await writeFromStream(f, fut) diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 4f26c078ae..8793c3f771 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -1247,16 +1247,14 @@ proc downloadFile*(client: HttpClient | AsyncHttpClient, client.bodyStream = newFileStream(filename, fmWrite) if client.bodyStream.isNil: fileError("Unable to open file") - else: - var f = openAsync(filename, fmWrite) - client.bodyStream = f.getWriteStream() - - await parseBody(client, resp.headers, resp.version) - - when client is HttpClient: + parseBody(client, resp.headers, resp.version) client.bodyStream.close() else: + client.bodyStream = newFutureStream[string]() + var f = openAsync(filename, fmWrite) + asyncCheck parseBody(client, resp.headers, resp.version) + await f.setWriteStream(client.bodyStream) f.close() if resp.code.is4xx or resp.code.is5xx: - raise newException(HttpRequestError, resp.status) \ No newline at end of file + raise newException(HttpRequestError, resp.status) From 912d95a6ea2583bd783f85527dc3526b77710568 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sun, 26 Feb 2017 11:36:16 +0100 Subject: [PATCH 15/19] Add small warning in httpclient's onProgressChanged docs. --- lib/pure/httpclient.nim | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 8793c3f771..84f66b6dc6 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -84,6 +84,9 @@ ## .. code-block:: Nim ## client.onProgressChanged = nil ## +## **Warning:** The ``total`` reported by httpclient may be 0 in some cases. +## +## ## SSL/TLS support ## =============== ## This requires the OpenSSL library, fortunately it's widely used and installed From f9cce320974c33554302d2a801c89e413c70a80b Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sun, 26 Feb 2017 12:52:19 +0100 Subject: [PATCH 16/19] Various fixes to FutureStreams based on PR feedback. --- lib/pure/asyncdispatch.nim | 13 ++++++++- lib/pure/asyncfile.nim | 13 +++++---- lib/pure/collections/deques.nim | 2 +- lib/pure/httpclient.nim | 34 ++++++++--------------- lib/pure/includes/asyncfutures.nim | 43 +++++++++++++++++++----------- lib/upcoming/asyncdispatch.nim | 11 ++++++++ tests/async/tfuturestream.nim | 4 +-- 7 files changed, 72 insertions(+), 48 deletions(-) diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 58113ae699..7fa686f00f 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue, queues +import os, oids, tables, strutils, times, heapqueue import nativesockets, net, deques @@ -1387,6 +1387,17 @@ proc send*(socket: AsyncFD, data: string, # -- Await Macro include asyncmacro +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + result = "" + while true: + let (hasValue, value) = await future.read() + if hasValue: + result.add(value) + else: + break + proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = ## Reads a line of data from ``socket``. Returned future will complete once ## a full line is read or an error occurs. diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index adfe6edba6..488b8276eb 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -476,13 +476,16 @@ proc close*(f: AsyncFile) = if close(f.fd.cint) == -1: raiseOSError(osLastError()) -proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} = +proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} = + ## Reads data from the specified future stream until it is completed. + ## The data which is read is written to the file immediately and + ## freed from memory. + ## + ## This procedure is perfect for saving streamed data to a file without + ## wasting memory. while true: - let (hasValue, value) = await fut.take() + let (hasValue, value) = await fs.read() if hasValue: await f.write(value) else: break - -proc setWriteStream*(f: AsyncFile; fut: FutureStream[string]) {.async.} = - await writeFromStream(f, fut) diff --git a/lib/pure/collections/deques.nim b/lib/pure/collections/deques.nim index 495d7896c7..d42679f06c 100644 --- a/lib/pure/collections/deques.nim +++ b/lib/pure/collections/deques.nim @@ -129,7 +129,7 @@ proc expandIfNeeded[T](deq: var Deque[T]) = var cap = deq.mask + 1 if unlikely(deq.count >= cap): var n = newSeq[T](cap * 2) - for i, x in deq: # don't use copyMem because the GC and because it's slower. + for i, x in pairs(deq): # don't use copyMem because the GC and because it's slower. shallowCopy(n[i], x) shallowCopy(deq.data, n) deq.mask = cap * 2 - 1 diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 84f66b6dc6..4d8400af65 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -131,7 +131,7 @@ type version*: string status*: string headers*: HttpHeaders - body: string # TODO: here for compatibility with old httpclient procs. + body: string bodyStream*: Stream AsyncResponse* = ref object @@ -163,19 +163,6 @@ proc `body=`*(response: Response, value: string) {.deprecated.} = ## **This is deprecated and should not be used**. response.body = value -proc readAll*(future: FutureStream[string]): Future[string] {.async.} = - ## Returns a future that will complete when all the string data from the - ## specified future stream is retrieved. - - # TODO: Move this to asyncfutures. - result = "" - while true: - let (hasValue, value) = await future.take() - if hasValue: - result.add(value) - else: - break - proc body*(response: AsyncResponse): Future[string] {.async.} = ## Reads the response's body and caches it. The read is performed only ## once. @@ -650,7 +637,7 @@ proc post*(url: string, extraHeaders = "", body = "", ## **Deprecated since version 0.15.0**: use ``HttpClient.post`` instead. let (mpHeaders, mpBody) = format(multipart) - template withNewLine(x): expr = + template withNewLine(x): untyped = if x.len > 0 and not x.endsWith("\c\L"): x & "\c\L" else: @@ -891,10 +878,7 @@ proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int, readLen.inc(data.len) if keep: - when client.socket is Socket: - client.bodyStream.write(data) - else: - await client.bodyStream.put(data) + await client.bodyStream.write(data) await reportProgress(client, data.len) @@ -1253,11 +1237,15 @@ proc downloadFile*(client: HttpClient | AsyncHttpClient, parseBody(client, resp.headers, resp.version) client.bodyStream.close() else: - client.bodyStream = newFutureStream[string]() - var f = openAsync(filename, fmWrite) + client.bodyStream = newFutureStream[string]("downloadFile") + var file = openAsync(filename, fmWrite) + # Let `parseBody` write response data into client.bodyStream in the + # background. asyncCheck parseBody(client, resp.headers, resp.version) - await f.setWriteStream(client.bodyStream) - f.close() + # The `writeFromStream` proc will complete once all the data in the + # `bodyStream` has been written to the file. + await file.writeFromStream(client.bodyStream) + file.close() if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 6f6693605e..a597de5cf5 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -17,8 +17,10 @@ type FutureVar*[T] = distinct Future[T] FutureStream*[T] = ref object of FutureBase ## Special future that acts as - ## a queue. - queue: Queue[T] + ## a queue. Its API is still + ## experimental and so is + ## subject to change. + queue: Deque[T] FutureError* = object of Exception cause*: FutureBase @@ -30,7 +32,7 @@ when not defined(release): proc callSoon*(cbproc: proc ()) {.gcsafe.} -template setupFutureBase(fromProc: string): stmt = +template setupFutureBase(fromProc: string) = new(result) result.finished = false when not defined(release): @@ -55,13 +57,20 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = result = FutureVar[T](newFuture[T](fromProc)) proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = - ## Create a new ``FutureStream``. This Future type's callback can be activated - ## multiple times when new data is written to it. + ## Create a new ``FutureStream``. This future's callback is activated when + ## two events occur: + ## + ## * New data is written into the future stream. + ## * The future stream is completed (this means that no more data will be + ## written). ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. + ## + ## **Note:** The API of FutureStream is still new and so has a higher + ## likelihood of changing in the future. setupFutureBase(fromProc) - result.queue = initQueue[T]() + result.queue = initDeque[T]() proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. @@ -130,7 +139,7 @@ proc complete*[T](future: FutureVar[T], val: T) = fut.cb() proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` signifying the end of data. + ## Completes a ``FutureStream`` signalling the end of data. future.finished = true if not future.cb.isNil(): future.cb() @@ -179,8 +188,8 @@ proc `callback=`*[T](future: FutureStream[T], ## The callback is also called when the future is completed. So you should ## use ``finished`` to check whether data is available. ## - ## If the future stream already has data then ``cb`` will be called - ## immediately. + ## If the future stream already has data or is finished then ``cb`` will be + ## called immediately. future.cb = proc () = cb(future) if future.queue.len > 0 or future.finished: callSoon(future.cb) @@ -236,8 +245,9 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. ## - ## For a ``FutureStream`` this signifies that no more data will be placed - ## inside it and that there is no data waiting to be retrieved. + ## For a ``FutureStream`` a ``true`` value means that no more data will be + ## placed inside the stream _and_ that there is no data waiting to be + ## retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished elif future is FutureStream[T]: @@ -249,7 +259,7 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil -proc put*[T](future: FutureStream[T], value: T): Future[void] = +proc write*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## ## This will raise ``ValueError`` if ``future`` is finished. @@ -258,12 +268,13 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] = let msg = "FutureStream is finished and so no longer accepts new data." result.fail(newException(ValueError, msg)) return - # TODO: Buffering. - future.queue.enqueue(value) + # TODO: Implement limiting of the streams storage to prevent it growing + # infinitely when no reads are occuring. + future.queue.addLast(value) if not future.cb.isNil: future.cb() result.complete() -proc take*[T](future: FutureStream[T]): Future[(bool, T)] = +proc read*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data ## placed into it. The future will be completed with the oldest ## value stored inside the stream. The return value will also determine @@ -286,7 +297,7 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = false else: res[0] = true - res[1] = fs.queue.dequeue() + res[1] = fs.queue.popLast() if not resFut.finished: resFut.complete(res) diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index bdc04fdeb9..823b19138c 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1664,6 +1664,17 @@ proc accept*(socket: AsyncFD, # -- Await Macro include asyncmacro +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + result = "" + while true: + let (hasValue, value) = await future.take() + if hasValue: + result.add(value) + else: + break + proc recvLine*(socket: AsyncFD): Future[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once ## a full line is read or an error occurs. diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index ce5cfec744..9a8e986a09 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -19,14 +19,14 @@ var fs = newFutureStream[int]() proc alpha() {.async.} = for i in 0 .. 5: await sleepAsync(1000) - await fs.put(i) + await fs.write(i) echo("Done") fs.complete() proc beta() {.async.} = while not fs.finished: - let (hasValue, value) = await fs.take() + let (hasValue, value) = await fs.read() if hasValue: echo(value) From b5de988eda30483387c838c653010c73fc06d239 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sun, 26 Feb 2017 12:54:03 +0100 Subject: [PATCH 17/19] Fix the other withNewLine template. --- lib/pure/httpclient.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 4d8400af65..e888470040 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -1185,7 +1185,7 @@ proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "", ## specified in ``client.maxRedirects``. let (mpHeader, mpBody) = format(multipart) # TODO: Support FutureStream for `body` parameter. - template withNewLine(x): expr = + template withNewLine(x): untyped = if x.len > 0 and not x.endsWith("\c\L"): x & "\c\L" else: From 48c50f6b4100ba855de4c58b8cfe28cb6352d771 Mon Sep 17 00:00:00 2001 From: Araq Date: Sun, 26 Feb 2017 13:28:46 +0100 Subject: [PATCH 18/19] make the logic right --- lib/pure/includes/asyncfutures.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index a597de5cf5..6af5bf3cfd 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -297,7 +297,7 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = false else: res[0] = true - res[1] = fs.queue.popLast() + res[1] = fs.queue.popFirst() if not resFut.finished: resFut.complete(res) From 843099d8aea0b39fa4c92c5f8d2725e230c03efb Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sun, 26 Feb 2017 15:58:07 +0100 Subject: [PATCH 19/19] Fixes upcoming tests. --- lib/upcoming/asyncdispatch.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 823b19138c..d2de4a465e 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1669,7 +1669,7 @@ proc readAll*(future: FutureStream[string]): Future[string] {.async.} = ## specified future stream is retrieved. result = "" while true: - let (hasValue, value) = await future.take() + let (hasValue, value) = await future.read() if hasValue: result.add(value) else: