mirror of
https://github.com/nim-lang/Nim.git
synced 2026-06-08 04:44:20 +00:00
Various fixes to FutureStreams based on PR feedback.
This commit is contained in:
@@ -9,7 +9,7 @@
|
||||
|
||||
include "system/inclrtl"
|
||||
|
||||
import os, oids, tables, strutils, times, heapqueue, queues
|
||||
import os, oids, tables, strutils, times, heapqueue
|
||||
|
||||
import nativesockets, net, deques
|
||||
|
||||
@@ -1387,6 +1387,17 @@ proc send*(socket: AsyncFD, data: string,
|
||||
# -- Await Macro
|
||||
include asyncmacro
|
||||
|
||||
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
|
||||
## Returns a future that will complete when all the string data from the
|
||||
## specified future stream is retrieved.
|
||||
result = ""
|
||||
while true:
|
||||
let (hasValue, value) = await future.read()
|
||||
if hasValue:
|
||||
result.add(value)
|
||||
else:
|
||||
break
|
||||
|
||||
proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
|
||||
## Reads a line of data from ``socket``. Returned future will complete once
|
||||
## a full line is read or an error occurs.
|
||||
|
||||
@@ -476,13 +476,16 @@ proc close*(f: AsyncFile) =
|
||||
if close(f.fd.cint) == -1:
|
||||
raiseOSError(osLastError())
|
||||
|
||||
proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} =
|
||||
proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
|
||||
## Reads data from the specified future stream until it is completed.
|
||||
## The data which is read is written to the file immediately and
|
||||
## freed from memory.
|
||||
##
|
||||
## This procedure is perfect for saving streamed data to a file without
|
||||
## wasting memory.
|
||||
while true:
|
||||
let (hasValue, value) = await fut.take()
|
||||
let (hasValue, value) = await fs.read()
|
||||
if hasValue:
|
||||
await f.write(value)
|
||||
else:
|
||||
break
|
||||
|
||||
proc setWriteStream*(f: AsyncFile; fut: FutureStream[string]) {.async.} =
|
||||
await writeFromStream(f, fut)
|
||||
|
||||
@@ -129,7 +129,7 @@ proc expandIfNeeded[T](deq: var Deque[T]) =
|
||||
var cap = deq.mask + 1
|
||||
if unlikely(deq.count >= cap):
|
||||
var n = newSeq[T](cap * 2)
|
||||
for i, x in deq: # don't use copyMem because the GC and because it's slower.
|
||||
for i, x in pairs(deq): # don't use copyMem because the GC and because it's slower.
|
||||
shallowCopy(n[i], x)
|
||||
shallowCopy(deq.data, n)
|
||||
deq.mask = cap * 2 - 1
|
||||
|
||||
@@ -131,7 +131,7 @@ type
|
||||
version*: string
|
||||
status*: string
|
||||
headers*: HttpHeaders
|
||||
body: string # TODO: here for compatibility with old httpclient procs.
|
||||
body: string
|
||||
bodyStream*: Stream
|
||||
|
||||
AsyncResponse* = ref object
|
||||
@@ -163,19 +163,6 @@ proc `body=`*(response: Response, value: string) {.deprecated.} =
|
||||
## **This is deprecated and should not be used**.
|
||||
response.body = value
|
||||
|
||||
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
|
||||
## Returns a future that will complete when all the string data from the
|
||||
## specified future stream is retrieved.
|
||||
|
||||
# TODO: Move this to asyncfutures.
|
||||
result = ""
|
||||
while true:
|
||||
let (hasValue, value) = await future.take()
|
||||
if hasValue:
|
||||
result.add(value)
|
||||
else:
|
||||
break
|
||||
|
||||
proc body*(response: AsyncResponse): Future[string] {.async.} =
|
||||
## Reads the response's body and caches it. The read is performed only
|
||||
## once.
|
||||
@@ -650,7 +637,7 @@ proc post*(url: string, extraHeaders = "", body = "",
|
||||
## **Deprecated since version 0.15.0**: use ``HttpClient.post`` instead.
|
||||
let (mpHeaders, mpBody) = format(multipart)
|
||||
|
||||
template withNewLine(x): expr =
|
||||
template withNewLine(x): untyped =
|
||||
if x.len > 0 and not x.endsWith("\c\L"):
|
||||
x & "\c\L"
|
||||
else:
|
||||
@@ -891,10 +878,7 @@ proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int,
|
||||
|
||||
readLen.inc(data.len)
|
||||
if keep:
|
||||
when client.socket is Socket:
|
||||
client.bodyStream.write(data)
|
||||
else:
|
||||
await client.bodyStream.put(data)
|
||||
await client.bodyStream.write(data)
|
||||
|
||||
await reportProgress(client, data.len)
|
||||
|
||||
@@ -1253,11 +1237,15 @@ proc downloadFile*(client: HttpClient | AsyncHttpClient,
|
||||
parseBody(client, resp.headers, resp.version)
|
||||
client.bodyStream.close()
|
||||
else:
|
||||
client.bodyStream = newFutureStream[string]()
|
||||
var f = openAsync(filename, fmWrite)
|
||||
client.bodyStream = newFutureStream[string]("downloadFile")
|
||||
var file = openAsync(filename, fmWrite)
|
||||
# Let `parseBody` write response data into client.bodyStream in the
|
||||
# background.
|
||||
asyncCheck parseBody(client, resp.headers, resp.version)
|
||||
await f.setWriteStream(client.bodyStream)
|
||||
f.close()
|
||||
# The `writeFromStream` proc will complete once all the data in the
|
||||
# `bodyStream` has been written to the file.
|
||||
await file.writeFromStream(client.bodyStream)
|
||||
file.close()
|
||||
|
||||
if resp.code.is4xx or resp.code.is5xx:
|
||||
raise newException(HttpRequestError, resp.status)
|
||||
|
||||
@@ -17,8 +17,10 @@ type
|
||||
FutureVar*[T] = distinct Future[T]
|
||||
|
||||
FutureStream*[T] = ref object of FutureBase ## Special future that acts as
|
||||
## a queue.
|
||||
queue: Queue[T]
|
||||
## a queue. Its API is still
|
||||
## experimental and so is
|
||||
## subject to change.
|
||||
queue: Deque[T]
|
||||
|
||||
FutureError* = object of Exception
|
||||
cause*: FutureBase
|
||||
@@ -30,7 +32,7 @@ when not defined(release):
|
||||
|
||||
proc callSoon*(cbproc: proc ()) {.gcsafe.}
|
||||
|
||||
template setupFutureBase(fromProc: string): stmt =
|
||||
template setupFutureBase(fromProc: string) =
|
||||
new(result)
|
||||
result.finished = false
|
||||
when not defined(release):
|
||||
@@ -55,13 +57,20 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
|
||||
result = FutureVar[T](newFuture[T](fromProc))
|
||||
|
||||
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
|
||||
## Create a new ``FutureStream``. This Future type's callback can be activated
|
||||
## multiple times when new data is written to it.
|
||||
## Create a new ``FutureStream``. This future's callback is activated when
|
||||
## two events occur:
|
||||
##
|
||||
## * New data is written into the future stream.
|
||||
## * The future stream is completed (this means that no more data will be
|
||||
## written).
|
||||
##
|
||||
## Specifying ``fromProc``, which is a string specifying the name of the proc
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
##
|
||||
## **Note:** The API of FutureStream is still new and so has a higher
|
||||
## likelihood of changing in the future.
|
||||
setupFutureBase(fromProc)
|
||||
result.queue = initQueue[T]()
|
||||
result.queue = initDeque[T]()
|
||||
|
||||
proc clean*[T](future: FutureVar[T]) =
|
||||
## Resets the ``finished`` status of ``future``.
|
||||
@@ -130,7 +139,7 @@ proc complete*[T](future: FutureVar[T], val: T) =
|
||||
fut.cb()
|
||||
|
||||
proc complete*[T](future: FutureStream[T]) =
|
||||
## Completes a ``FutureStream`` signifying the end of data.
|
||||
## Completes a ``FutureStream`` signalling the end of data.
|
||||
future.finished = true
|
||||
if not future.cb.isNil():
|
||||
future.cb()
|
||||
@@ -179,8 +188,8 @@ proc `callback=`*[T](future: FutureStream[T],
|
||||
## The callback is also called when the future is completed. So you should
|
||||
## use ``finished`` to check whether data is available.
|
||||
##
|
||||
## If the future stream already has data then ``cb`` will be called
|
||||
## immediately.
|
||||
## If the future stream already has data or is finished then ``cb`` will be
|
||||
## called immediately.
|
||||
future.cb = proc () = cb(future)
|
||||
if future.queue.len > 0 or future.finished:
|
||||
callSoon(future.cb)
|
||||
@@ -236,8 +245,9 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
|
||||
##
|
||||
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
|
||||
##
|
||||
## For a ``FutureStream`` this signifies that no more data will be placed
|
||||
## inside it and that there is no data waiting to be retrieved.
|
||||
## For a ``FutureStream`` a ``true`` value means that no more data will be
|
||||
## placed inside the stream _and_ that there is no data waiting to be
|
||||
## retrieved.
|
||||
when future is FutureVar[T]:
|
||||
result = (Future[T](future)).finished
|
||||
elif future is FutureStream[T]:
|
||||
@@ -249,7 +259,7 @@ proc failed*(future: FutureBase): bool =
|
||||
## Determines whether ``future`` completed with an error.
|
||||
return future.error != nil
|
||||
|
||||
proc put*[T](future: FutureStream[T], value: T): Future[void] =
|
||||
proc write*[T](future: FutureStream[T], value: T): Future[void] =
|
||||
## Writes the specified value inside the specified future stream.
|
||||
##
|
||||
## This will raise ``ValueError`` if ``future`` is finished.
|
||||
@@ -258,12 +268,13 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] =
|
||||
let msg = "FutureStream is finished and so no longer accepts new data."
|
||||
result.fail(newException(ValueError, msg))
|
||||
return
|
||||
# TODO: Buffering.
|
||||
future.queue.enqueue(value)
|
||||
# TODO: Implement limiting of the streams storage to prevent it growing
|
||||
# infinitely when no reads are occuring.
|
||||
future.queue.addLast(value)
|
||||
if not future.cb.isNil: future.cb()
|
||||
result.complete()
|
||||
|
||||
proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
|
||||
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
|
||||
## Returns a future that will complete when the ``FutureStream`` has data
|
||||
## placed into it. The future will be completed with the oldest
|
||||
## value stored inside the stream. The return value will also determine
|
||||
@@ -286,7 +297,7 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
|
||||
res[0] = false
|
||||
else:
|
||||
res[0] = true
|
||||
res[1] = fs.queue.dequeue()
|
||||
res[1] = fs.queue.popLast()
|
||||
|
||||
if not resFut.finished:
|
||||
resFut.complete(res)
|
||||
|
||||
@@ -1664,6 +1664,17 @@ proc accept*(socket: AsyncFD,
|
||||
# -- Await Macro
|
||||
include asyncmacro
|
||||
|
||||
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
|
||||
## Returns a future that will complete when all the string data from the
|
||||
## specified future stream is retrieved.
|
||||
result = ""
|
||||
while true:
|
||||
let (hasValue, value) = await future.take()
|
||||
if hasValue:
|
||||
result.add(value)
|
||||
else:
|
||||
break
|
||||
|
||||
proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
|
||||
## Reads a line of data from ``socket``. Returned future will complete once
|
||||
## a full line is read or an error occurs.
|
||||
|
||||
@@ -19,14 +19,14 @@ var fs = newFutureStream[int]()
|
||||
proc alpha() {.async.} =
|
||||
for i in 0 .. 5:
|
||||
await sleepAsync(1000)
|
||||
await fs.put(i)
|
||||
await fs.write(i)
|
||||
|
||||
echo("Done")
|
||||
fs.complete()
|
||||
|
||||
proc beta() {.async.} =
|
||||
while not fs.finished:
|
||||
let (hasValue, value) = await fs.take()
|
||||
let (hasValue, value) = await fs.read()
|
||||
if hasValue:
|
||||
echo(value)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user