* Fixes #16436

* Comments addressed

(cherry picked from commit 4ae520711d)
This commit is contained in:
Yuriy Glukhov
2021-01-14 09:53:21 +02:00
committed by narimiran
parent 3e154e7740
commit 431eb05708
3 changed files with 50 additions and 4 deletions

View File

@@ -21,6 +21,7 @@ type
queue: Deque[T]
finished: bool
cb: proc () {.closure, gcsafe.}
error*: ref Exception
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new ``FutureStream``. This future's callback is activated when
@@ -40,10 +41,19 @@ proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signalling the end of data.
assert(future.error == nil, "Trying to complete failed stream")
future.finished = true
if not future.cb.isNil:
future.cb()
proc fail*[T](future: FutureStream[T], error: ref Exception) =
## Completes ``future`` with ``error``.
assert(not future.finished)
future.finished = true
future.error = error
if not future.cb.isNil:
future.cb()
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
@@ -65,6 +75,10 @@ proc finished*[T](future: FutureStream[T]): bool =
## no data waiting to be retrieved.
result = future.finished and future.queue.len == 0
proc failed*[T](future: FutureStream[T]): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
@@ -107,7 +121,10 @@ proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
res[0] = true
res[1] = fs.queue.popFirst()
resFut.complete(res)
if fs.failed:
resFut.fail(fs.error)
else:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil:

View File

@@ -843,8 +843,11 @@ proc parseResponse(client: HttpClient | AsyncHttpClient,
client.bodyStream = newFutureStream[string]("parseResponse")
result.bodyStream = client.bodyStream
assert(client.parseBodyFut.isNil or client.parseBodyFut.finished)
# do not wait here for the body request to complete
client.parseBodyFut = parseBody(client, result.headers, result.version)
# do not wait here for the body request to complete
client.parseBodyFut.addCallback do():
if client.parseBodyFut.failed:
client.bodyStream.fail(client.parseBodyFut.error)
proc newConnection(client: HttpClient | AsyncHttpClient,
url: Uri) {.multisync.} =
@@ -1177,13 +1180,16 @@ proc downloadFile*(client: AsyncHttpClient, url: string,
client.bodyStream = newFutureStream[string]("downloadFile")
var file = openAsync(filename, fmWrite)
defer: file.close()
# Let `parseBody` write response data into client.bodyStream in the
# background.
asyncCheck parseBody(client, resp.headers, resp.version)
let parseBodyFut = parseBody(client, resp.headers, resp.version)
parseBodyFut.addCallback do():
if parseBodyFut.failed:
client.bodyStream.fail(parseBodyFut.error)
# The `writeFromStream` proc will complete once all the data in the
# `bodyStream` has been written to the file.
await file.writeFromStream(client.bodyStream)
file.close()
if resp.code.is4xx or resp.code.is5xx:
raise newException(HttpRequestError, resp.status)

View File

@@ -0,0 +1,23 @@
discard """
cmd: "nim c --threads:on $file"
"""
import asynchttpserver, httpclient, asyncdispatch
block: # bug #16436
proc startServer() {.async.} =
proc cb(req: Request) {.async.} =
let headers = { "Content-length": "15"} # Provide invalid content-length
await req.respond(Http200, "Hello World", headers.newHttpHeaders())
var server = newAsyncHttpServer()
await server.serve(Port(5555), cb)
proc runClient() {.async.} =
let c = newAsyncHttpClient(headers = {"Connection": "close"}.newHttpHeaders)
let r = await c.getContent("http://127.0.0.1:5555")
doAssert false, "should fail earlier"
asyncCheck startServer()
doAssertRaises(ProtocolError):
waitFor runClient()