diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 59218accec..2a412d967f 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,11 +9,12 @@ include "system/inclrtl" -import os, tables, strutils, times, heapqueue, options, asyncfutures +import os, tables, strutils, times, heapqueue, options, asyncstreams +import asyncfutures except callSoon import nativesockets, net, deques export Port, SocketFlag -export asyncfutures +export asyncfutures, asyncstreams #{.injectStmt: newGcInvariant().} @@ -188,7 +189,7 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = result = int((timerTimeout - curTime) * 1000) if result < 0: result = 0 -proc callSoon*(cbproc: proc ()) {.gcsafe.} +proc callSoon(cbproc: proc ()) {.gcsafe.} proc initGlobalDispatcher = if asyncfutures.callSoonProc == nil: @@ -1336,7 +1337,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = return add(result, c) -proc callSoon*(cbproc: proc ()) = +proc callSoon(cbproc: proc ()) = ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. getGlobalDispatcher().callbacks.addLast(cbproc) diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim index 8a1538546a..f7d96d5e4c 100644 --- a/lib/pure/asyncfutures.nim +++ b/lib/pure/asyncfutures.nim @@ -19,12 +19,6 @@ type FutureVar*[T] = distinct Future[T] - FutureStream*[T] = ref object of FutureBase ## Special future that acts as - ## a queue. Its API is still - ## experimental and so is - ## subject to change. - queue: Deque[T] - FutureError* = object of Exception cause*: FutureBase @@ -35,7 +29,7 @@ when not defined(release): var callSoonProc* {.threadvar.}: (proc(cbproc: proc ()) {.gcsafe.}) -proc callSoon(cbproc: proc ()) = +proc callSoon*(cbproc: proc ()) = if callSoonProc == nil: # Loop not initialized yet. Call the function directly to allow setup code to use futures. cbproc() @@ -66,22 +60,6 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) -proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = - ## Create a new ``FutureStream``. This future's callback is activated when - ## two events occur: - ## - ## * New data is written into the future stream. - ## * The future stream is completed (this means that no more data will be - ## written). - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. - ## - ## **Note:** The API of FutureStream is still new and so has a higher - ## likelihood of changing in the future. - setupFutureBase(fromProc) - result.queue = initDeque[T]() - proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false @@ -148,12 +126,6 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` signalling the end of data. - future.finished = true - if not future.cb.isNil(): - future.cb() - proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -190,20 +162,6 @@ proc `callback=`*[T](future: Future[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) -proc `callback=`*[T](future: FutureStream[T], - cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = - ## Sets the callback proc to be called when data was placed inside the - ## future stream. - ## - ## The callback is also called when the future is completed. So you should - ## use ``finished`` to check whether data is available. - ## - ## If the future stream already has data or is finished then ``cb`` will be - ## called immediately. - future.cb = proc () = cb(future) - if future.queue.len > 0 or future.finished: - callSoon(future.cb) - proc injectStacktrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): @@ -250,18 +208,12 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = +proc finished*[T](future: Future[T] | FutureVar[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. - ## - ## For a ``FutureStream`` a ``true`` value means that no more data will be - ## placed inside the stream _and_ that there is no data waiting to be - ## retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished - elif future is FutureStream[T]: - result = future.finished and future.queue.len == 0 else: result = future.finished @@ -269,57 +221,6 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil -proc write*[T](future: FutureStream[T], value: T): Future[void] = - ## Writes the specified value inside the specified future stream. - ## - ## This will raise ``ValueError`` if ``future`` is finished. - result = newFuture[void]("FutureStream.put") - if future.finished: - let msg = "FutureStream is finished and so no longer accepts new data." - result.fail(newException(ValueError, msg)) - return - # TODO: Implement limiting of the streams storage to prevent it growing - # infinitely when no reads are occuring. - future.queue.addLast(value) - if not future.cb.isNil: future.cb() - result.complete() - -proc read*[T](future: FutureStream[T]): Future[(bool, T)] = - ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest - ## value stored inside the stream. The return value will also determine - ## whether data was retrieved, ``false`` means that the future stream was - ## completed and no data was retrieved. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - var resFut = newFuture[(bool, T)]("FutureStream.take") - let savedCb = future.cb - future.callback = - proc (fs: FutureStream[T]) = - # We don't want this callback called again. - future.cb = nil - - # The return value depends on whether the FutureStream has finished. - var res: (bool, T) - if finished(fs): - # Remember, this callback is called when the FutureStream is completed. - res[0] = false - else: - res[0] = true - res[1] = fs.queue.popFirst() - - if not resFut.finished: - resFut.complete(res) - - # If the saved callback isn't nil then let's call it. - if not savedCb.isNil: savedCb() - return resFut - -proc len*[T](future: FutureStream[T]): int = - ## Returns the amount of data pieces inside the stream. - future.queue.len - proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim new file mode 100644 index 0000000000..d3ea143f32 --- /dev/null +++ b/lib/pure/asyncstreams.nim @@ -0,0 +1,105 @@ +import asyncfutures + +import deques + +type + FutureStream*[T] = ref object ## Special future that acts as + ## a queue. Its API is still + ## experimental and so is + ## subject to change. + queue: Deque[T] + finished: bool + cb: proc () {.closure, gcsafe.} + +proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = + ## Create a new ``FutureStream``. This future's callback is activated when + ## two events occur: + ## + ## * New data is written into the future stream. + ## * The future stream is completed (this means that no more data will be + ## written). + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + ## + ## **Note:** The API of FutureStream is still new and so has a higher + ## likelihood of changing in the future. + result = FutureStream[T](finished: false, cb: nil) + result.queue = initDeque[T]() + +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signalling the end of data. + future.finished = true + if not future.cb.isNil: + future.cb() + +proc `callback=`*[T](future: FutureStream[T], + cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = + ## Sets the callback proc to be called when data was placed inside the + ## future stream. + ## + ## The callback is also called when the future is completed. So you should + ## use ``finished`` to check whether data is available. + ## + ## If the future stream already has data or is finished then ``cb`` will be + ## called immediately. + future.cb = proc () = cb(future) + if future.queue.len > 0 or future.finished: + callSoon(future.cb) + +proc finished*[T](future: FutureStream[T]): bool = + ## Check if a ``FutureStream`` is finished. ``true`` value means that + ## no more data will be placed inside the stream _and_ that there is + ## no data waiting to be retrieved. + result = future.finished and future.queue.len == 0 + +proc write*[T](future: FutureStream[T], value: T): Future[void] = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + result = newFuture[void]("FutureStream.put") + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + result.fail(newException(ValueError, msg)) + return + # TODO: Implement limiting of the streams storage to prevent it growing + # infinitely when no reads are occuring. + future.queue.addLast(value) + if not future.cb.isNil: future.cb() + result.complete() + +proc read*[T](future: FutureStream[T]): Future[(bool, T)] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[(bool, T)]("FutureStream.take") + let savedCb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.queue.popFirst() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() + return resFut + +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 84c33f3abe..d90ca6534e 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -9,12 +9,13 @@ include "system/inclrtl" -import os, tables, strutils, times, heapqueue, lists, options, asyncfutures +import os, tables, strutils, times, heapqueue, lists, options, asyncstreams +import asyncfutures except callSoon import nativesockets, net, deques export Port, SocketFlag -export asyncfutures +export asyncfutures, asyncstreams #{.injectStmt: newGcInvariant().} @@ -160,7 +161,7 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = result = int((timerTimeout - curTime) * 1000) if result < 0: result = 0 -proc callSoon*(cbproc: proc ()) {.gcsafe.} +proc callSoon(cbproc: proc ()) {.gcsafe.} proc initGlobalDispatcher = if asyncfutures.callSoonProc == nil: @@ -1611,7 +1612,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} = return add(result, c) -proc callSoon*(cbproc: proc ()) = +proc callSoon(cbproc: proc ()) = ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. getGlobalDispatcher().callbacks.addLast(cbproc)