Implement streamed body reading in httpclient.

This commit is contained in:
Dominik Picheta
2017-02-11 12:43:16 +01:00
parent 77071eb767
commit 1b4067a81b
2 changed files with 111 additions and 43 deletions

View File

@@ -117,20 +117,28 @@
## only basic authentication is supported at the moment.
import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes,
math, random, httpcore, times, tables
math, random, httpcore, times, tables, streams
import asyncnet, asyncdispatch
import nativesockets
export httpcore except parseHeader # TODO: The ``except`` doesn't work
type
Response* = object
Response* = ref object
version*: string
status*: string
headers*: HttpHeaders
body*: string
body: string # TODO: here for compatibility with old httpclient procs.
bodyStream*: StringStream
proc code*(response: Response): HttpCode
AsyncResponse* = ref object
version*: string
status*: string
headers*: HttpHeaders
body: string
bodyStream*: FutureStream[string]
proc code*(response: Response | AsyncResponse): HttpCode
{.raises: [ValueError, OverflowError].} =
## Retrieves the specified response's ``HttpCode``.
##
@@ -138,6 +146,40 @@ proc code*(response: Response): HttpCode
## corresponding ``HttpCode``.
return response.status[0 .. 2].parseInt.HttpCode
proc body*(response: Response): string =
## Retrieves the specified response's body.
##
## The response's body stream is read synchronously.
if response.body.isNil():
response.body = response.bodyStream.readAll()
return response.body
proc `body=`*(response: Response, value: string) {.deprecated.} =
## Setter for backward compatibility.
##
## **This is deprecated and should not be used**.
response.body = value
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
## Returns a future that will complete when all the string data from the
## specified future stream is retrieved.
# TODO: Move this to asyncfutures.
result = ""
while true:
let (hasValue, value) = await future.take()
if hasValue:
result.add(value)
else:
break
proc body*(response: AsyncResponse): Future[string] {.async.} =
## Reads the response's body and caches it. The read is performed only
## once.
if response.body.isNil:
response.body = await readAll(response.bodyStream)
return response.body
type
Proxy* = ref object
url*: Uri
@@ -249,6 +291,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in
result.add(buf)
proc parseResponse(s: Socket, getBody: bool, timeout: int): Response =
new result
var parsedStatus = false
var linei = 0
var fullyRead = false
@@ -735,6 +778,10 @@ type
contentProgress: BiggestInt
oneSecondProgress: BiggestInt
lastProgressReport: float
when SocketType is AsyncSocket:
bodyStream: FutureStream[string]
else:
bodyStream: StringStream
type
HttpClient* = HttpClientBase[Socket]
@@ -764,6 +811,7 @@ proc newHttpClient*(userAgent = defUserAgent,
result.proxy = proxy
result.timeout = timeout
result.onProgressChanged = nil
result.bodyStream = newStringStream()
when defined(ssl):
result.sslContext = sslContext
@@ -794,6 +842,7 @@ proc newAsyncHttpClient*(userAgent = defUserAgent,
result.proxy = proxy
result.timeout = -1 # TODO
result.onProgressChanged = nil
result.bodyStream = newFutureStream[string]("newAsyncHttpClient")
when defined(ssl):
result.sslContext = sslContext
@@ -815,14 +864,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient,
client.oneSecondProgress = 0
client.lastProgressReport = epochTime()
proc recvFull(client: HttpClient | AsyncHttpClient,
size: int, timeout: int): Future[string] {.multisync.} =
proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int,
keep: bool): Future[int] {.multisync.} =
## Ensures that all the data requested is read and returned.
result = ""
var readLen = 0
while true:
if size == result.len: break
if size == readLen: break
let remainingSize = size - result.len
let remainingSize = size - readLen
let sizeToRecv = min(remainingSize, net.BufferSize)
when client.socket is Socket:
@@ -830,13 +879,20 @@ proc recvFull(client: HttpClient | AsyncHttpClient,
else:
let data = await client.socket.recv(sizeToRecv)
if data == "": break # We've been disconnected.
result.add data
readLen.inc(data.len)
if keep:
when client.socket is Socket:
client.bodyStream.write(data)
else:
await client.bodyStream.put(data)
await reportProgress(client, data.len)
proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
return readLen
proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void]
{.multisync.} =
result = ""
while true:
var chunkSize = 0
var chunkSizeStr = await client.socket.recvLine()
@@ -861,25 +917,29 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
httpError("Invalid chunk size: " & chunkSizeStr)
inc(i)
if chunkSize <= 0:
discard await recvFull(client, 2, client.timeout) # Skip \c\L
discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
break
result.add await recvFull(client, chunkSize, client.timeout)
discard await recvFull(client, 2, client.timeout) # Skip \c\L
discard await recvFull(client, chunkSize, client.timeout, true)
discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
# Trailer headers will only be sent if the request specifies that we want
# them: http://tools.ietf.org/html/rfc2616#section-3.6.1
proc parseBody(client: HttpClient | AsyncHttpClient,
headers: HttpHeaders,
httpVersion: string): Future[string] {.multisync.} =
result = ""
httpVersion: string): Future[void] {.multisync.} =
# Reset progress from previous requests.
client.contentTotal = 0
client.contentProgress = 0
client.oneSecondProgress = 0
client.lastProgressReport = 0
when client is HttpClient:
client.bodyStream = newStringStream()
else:
client.bodyStream = newFutureStream[string]("parseResponse")
if headers.getOrDefault"Transfer-Encoding" == "chunked":
result = await parseChunks(client)
await parseChunks(client)
else:
# -REGION- Content-Length
# (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3
@@ -888,26 +948,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient,
var length = contentLengthHeader.parseint()
client.contentTotal = length
if length > 0:
result = await client.recvFull(length, client.timeout)
if result == "":
let recvLen = await client.recvFull(length, client.timeout, true)
if recvLen == 0:
httpError("Got disconnected while trying to read body.")
if result.len != length:
if recvLen != length:
httpError("Received length doesn't match expected length. Wanted " &
$length & " got " & $result.len)
$length & " got " & $recvLen)
else:
# (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO
# -REGION- Connection: Close
# (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5
if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0":
var buf = ""
while true:
buf = await client.recvFull(4000, client.timeout)
if buf == "": break
result.add(buf)
let recvLen = await client.recvFull(4000, client.timeout, true)
if recvLen == 0: break
when client is AsyncHttpClient:
client.bodyStream.complete()
else:
client.bodyStream.setPosition(0)
proc parseResponse(client: HttpClient | AsyncHttpClient,
getBody: bool): Future[Response] {.multisync.} =
getBody: bool): Future[Response | AsyncResponse]
{.multisync.} =
new result
var parsedStatus = false
var linei = 0
var fullyRead = false
@@ -956,9 +1021,8 @@ proc parseResponse(client: HttpClient | AsyncHttpClient,
if not fullyRead:
httpError("Connection was closed before full request has been made")
if getBody:
result.body = await parseBody(client, result.headers, result.version)
else:
result.body = ""
await parseBody(client, result.headers, result.version)
result.bodyStream = client.bodyStream
proc newConnection(client: HttpClient | AsyncHttpClient,
url: Uri) {.multisync.} =
@@ -1006,8 +1070,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders =
result[k] = vs
proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
httpMethod: string, body = "",
headers: HttpHeaders = nil): Future[Response] {.multisync.} =
httpMethod: string, body = "",
headers: HttpHeaders = nil): Future[Response | AsyncResponse]
{.multisync.} =
# Helper that actually makes the request. Does not handle redirects.
let connectionUrl =
if client.proxy.isNil: parseUri(url) else: client.proxy.url
@@ -1053,10 +1118,10 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
# Restore the clients proxy in case it was overwritten.
client.proxy = savedProxy
proc request*(client: HttpClient | AsyncHttpClient, url: string,
httpMethod: string, body = "",
headers: HttpHeaders = nil): Future[Response] {.multisync.} =
headers: HttpHeaders = nil): Future[Response | AsyncResponse]
{.multisync.} =
## Connects to the hostname specified by the URL and performs a request
## using the custom method string specified by ``httpMethod``.
##
@@ -1078,7 +1143,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
proc request*(client: HttpClient | AsyncHttpClient, url: string,
httpMethod = HttpGET, body = "",
headers: HttpHeaders = nil): Future[Response] {.multisync.} =
headers: HttpHeaders = nil): Future[Response | AsyncResponse]
{.multisync.} =
## Connects to the hostname specified by the URL and performs a request
## using the method specified.
##
@@ -1088,11 +1154,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
##
## When a request is made to a different hostname, the current connection will
## be closed.
result = await request(client, url, $httpMethod, body,
headers = headers)
result = await request(client, url, $httpMethod, body, headers)
proc get*(client: HttpClient | AsyncHttpClient,
url: string): Future[Response] {.multisync.} =
url: string): Future[Response | AsyncResponse] {.multisync.} =
## Connects to the hostname specified by the URL and performs a GET request.
##
## This procedure will follow redirects up to a maximum number of redirects
@@ -1112,16 +1177,17 @@ proc getContent*(client: HttpClient | AsyncHttpClient,
if resp.code.is4xx or resp.code.is5xx:
raise newException(HttpRequestError, resp.status)
else:
return resp.body
return await resp.bodyStream.readAll()
proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "",
multipart: MultipartData = nil): Future[Response] {.multisync.} =
multipart: MultipartData = nil): Future[Response | AsyncResponse]
{.multisync.} =
## Connects to the hostname specified by the URL and performs a POST request.
##
## This procedure will follow redirects up to a maximum number of redirects
## specified in ``client.maxRedirects``.
let (mpHeader, mpBody) = format(multipart)
# TODO: Support FutureStream for `body` parameter.
template withNewLine(x): expr =
if x.len > 0 and not x.endsWith("\c\L"):
x & "\c\L"
@@ -1161,4 +1227,4 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string,
if resp.code.is4xx or resp.code.is5xx:
raise newException(HttpRequestError, resp.status)
else:
return resp.body
return await resp.bodyStream.readAll()

View File

@@ -13,7 +13,9 @@ proc asyncTest() {.async.} =
var client = newAsyncHttpClient()
var resp = await client.request("http://example.com/")
doAssert(resp.code.is2xx)
doAssert("<title>Example Domain</title>" in resp.body)
var body = await resp.body
body = await resp.body # Test caching
doAssert("<title>Example Domain</title>" in body)
resp = await client.request("http://example.com/404")
doAssert(resp.code.is4xx)