diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim index bd366f58d7..706fda4704 100644 --- a/lib/pure/asyncstreams.nim +++ b/lib/pure/asyncstreams.nim @@ -21,6 +21,7 @@ type queue: Deque[T] finished: bool cb: proc () {.closure, gcsafe.} + error*: ref Exception proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = ## Create a new ``FutureStream``. This future's callback is activated when @@ -40,10 +41,19 @@ proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = proc complete*[T](future: FutureStream[T]) = ## Completes a ``FutureStream`` signalling the end of data. + assert(future.error == nil, "Trying to complete failed stream") future.finished = true if not future.cb.isNil: future.cb() +proc fail*[T](future: FutureStream[T], error: ref Exception) = + ## Completes ``future`` with ``error``. + assert(not future.finished) + future.finished = true + future.error = error + if not future.cb.isNil: + future.cb() + proc `callback=`*[T](future: FutureStream[T], cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) = ## Sets the callback proc to be called when data was placed inside the @@ -65,6 +75,10 @@ proc finished*[T](future: FutureStream[T]): bool = ## no data waiting to be retrieved. result = future.finished and future.queue.len == 0 +proc failed*[T](future: FutureStream[T]): bool = + ## Determines whether ``future`` completed with an error. + return future.error != nil + proc write*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## @@ -107,7 +121,10 @@ proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) = res[0] = true res[1] = fs.queue.popFirst() - resFut.complete(res) + if fs.failed: + resFut.fail(fs.error) + else: + resFut.complete(res) # If the saved callback isn't nil then let's call it. if not savedCb.isNil: diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index bfc3782795..79d35aeb8e 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -843,8 +843,11 @@ proc parseResponse(client: HttpClient | AsyncHttpClient, client.bodyStream = newFutureStream[string]("parseResponse") result.bodyStream = client.bodyStream assert(client.parseBodyFut.isNil or client.parseBodyFut.finished) + # do not wait here for the body request to complete client.parseBodyFut = parseBody(client, result.headers, result.version) - # do not wait here for the body request to complete + client.parseBodyFut.addCallback do(): + if client.parseBodyFut.failed: + client.bodyStream.fail(client.parseBodyFut.error) proc newConnection(client: HttpClient | AsyncHttpClient, url: Uri) {.multisync.} = @@ -1177,13 +1180,16 @@ proc downloadFile*(client: AsyncHttpClient, url: string, client.bodyStream = newFutureStream[string]("downloadFile") var file = openAsync(filename, fmWrite) + defer: file.close() # Let `parseBody` write response data into client.bodyStream in the # background. - asyncCheck parseBody(client, resp.headers, resp.version) + let parseBodyFut = parseBody(client, resp.headers, resp.version) + parseBodyFut.addCallback do(): + if parseBodyFut.failed: + client.bodyStream.fail(parseBodyFut.error) # The `writeFromStream` proc will complete once all the data in the # `bodyStream` has been written to the file. await file.writeFromStream(client.bodyStream) - file.close() if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) diff --git a/tests/stdlib/thttpclient_standalone.nim b/tests/stdlib/thttpclient_standalone.nim new file mode 100644 index 0000000000..44a88e91ea --- /dev/null +++ b/tests/stdlib/thttpclient_standalone.nim @@ -0,0 +1,23 @@ +discard """ + cmd: "nim c --threads:on $file" +""" + +import asynchttpserver, httpclient, asyncdispatch + +block: # bug #16436 + proc startServer() {.async.} = + proc cb(req: Request) {.async.} = + let headers = { "Content-length": "15"} # Provide invalid content-length + await req.respond(Http200, "Hello World", headers.newHttpHeaders()) + + var server = newAsyncHttpServer() + await server.serve(Port(5555), cb) + + proc runClient() {.async.} = + let c = newAsyncHttpClient(headers = {"Connection": "close"}.newHttpHeaders) + let r = await c.getContent("http://127.0.0.1:5555") + doAssert false, "should fail earlier" + + asyncCheck startServer() + doAssertRaises(ProtocolError): + waitFor runClient()