Merge pull request #5373 from nim-lang/feature/async-streams

Async streams and HTTP client streaming downloads support
This commit is contained in:
Andreas Rumpf
2017-02-26 17:35:08 +01:00
committed by GitHub
11 changed files with 387 additions and 94 deletions

View File

@@ -1387,6 +1387,17 @@ proc send*(socket: AsyncFD, data: string,
# -- Await Macro
include asyncmacro
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
## Returns a future that will complete when all the string data from the
## specified future stream is retrieved.
result = ""
while true:
let (hasValue, value) = await future.read()
if hasValue:
result.add(value)
else:
break
proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.

View File

@@ -476,3 +476,16 @@ proc close*(f: AsyncFile) =
if close(f.fd.cint) == -1:
raiseOSError(osLastError())
proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} =
## Reads data from the specified future stream until it is completed.
## The data which is read is written to the file immediately and
## freed from memory.
##
## This procedure is perfect for saving streamed data to a file without
## wasting memory.
while true:
let (hasValue, value) = await fs.read()
if hasValue:
await f.write(value)
else:
break

View File

@@ -33,8 +33,10 @@ template createCb(retFutureSym, iteratorNameSym,
if not nameIterVar.finished:
var next = nameIterVar()
if next == nil:
assert retFutureSym.finished, "Async procedure's (" &
name & ") return Future was not finished."
if not retFutureSym.finished:
let msg = "Async procedure ($1) yielded `nil`, are you await'ing a " &
"`nil` Future?"
raise newException(AssertionError, msg % name)
else:
next.callback = cb
except:
@@ -281,6 +283,14 @@ proc getFutureVarIdents(params: NimNode): seq[NimNode] {.compileTime.} =
($params[i][1][0].ident).normalize == "futurevar":
result.add(params[i][0])
proc isInvalidReturnType(typeName: string): bool =
return typeName notin ["Future"] #, "FutureStream"]
proc verifyReturnType(typeName: string) {.compileTime.} =
if typeName.isInvalidReturnType:
error("Expected return type of 'Future' got '$1'" %
typeName)
proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
## This macro transforms a single procedure into a closure iterator.
## The ``async`` macro supports a stmtList holding multiple async procedures.
@@ -295,18 +305,16 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
# Verify that the return type is a Future[T]
if returnType.kind == nnkBracketExpr:
let fut = repr(returnType[0])
if fut != "Future":
error("Expected return type of 'Future' got '" & fut & "'")
verifyReturnType(fut)
baseType = returnType[1]
elif returnType.kind in nnkCallKinds and $returnType[0] == "[]":
let fut = repr(returnType[1])
if fut != "Future":
error("Expected return type of 'Future' got '" & fut & "'")
verifyReturnType(fut)
baseType = returnType[2]
elif returnType.kind == nnkEmpty:
baseType = returnType
else:
error("Expected return type of 'Future' got '" & repr(returnType) & "'")
verifyReturnType(repr(returnType))
let subtypeIsVoid = returnType.kind == nnkEmpty or
(baseType.kind == nnkIdent and returnType[1].ident == !"void")
@@ -390,7 +398,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
if procBody.kind != nnkEmpty:
result[6] = outerProcBody
#echo(treeRepr(result))
#if prc[0].getName == "testInfix":
#if prc[0].getName == "beta":
# echo(toStrLit(result))
macro async*(prc: untyped): untyped =
@@ -451,13 +459,12 @@ proc stripAwait(node: NimNode): NimNode =
for i in 0 .. <result.len:
result[i] = stripAwait(result[i])
proc splitParams(param: NimNode, async: bool): NimNode =
expectKind(param, nnkIdentDefs)
result = param
if param[1].kind == nnkInfix and $param[1][0].ident in ["|", "or"]:
let firstType = param[1][1]
proc splitParamType(paramType: NimNode, async: bool): NimNode =
result = paramType
if paramType.kind == nnkInfix and $paramType[0].ident in ["|", "or"]:
let firstType = paramType[1]
let firstTypeName = $firstType.ident
let secondType = param[1][2]
let secondType = paramType[2]
let secondTypeName = $secondType.ident
# Make sure that at least one has the name `async`, otherwise we shouldn't
@@ -468,22 +475,21 @@ proc splitParams(param: NimNode, async: bool): NimNode =
if async:
if firstTypeName.normalize.startsWith("async"):
result = newIdentDefs(param[0], param[1][1])
result = paramType[1]
elif secondTypeName.normalize.startsWith("async"):
result = newIdentDefs(param[0], param[1][2])
result = paramType[2]
else:
if not firstTypeName.normalize.startsWith("async"):
result = newIdentDefs(param[0], param[1][1])
result = paramType[1]
elif not secondTypeName.normalize.startsWith("async"):
result = newIdentDefs(param[0], param[1][2])
result = paramType[2]
proc stripReturnType(returnType: NimNode): NimNode =
# Strip out the 'Future' from 'Future[T]'.
result = returnType
if returnType.kind == nnkBracketExpr:
let fut = repr(returnType[0])
if fut != "Future":
error("Expected return type of 'Future' got '" & fut & "'")
verifyReturnType(fut)
result = returnType[1]
proc splitProc(prc: NimNode): (NimNode, NimNode) =
@@ -491,15 +497,24 @@ proc splitProc(prc: NimNode): (NimNode, NimNode) =
## for example: proc (socket: Socket | AsyncSocket).
## It transforms them so that ``proc (socket: Socket)`` and
## ``proc (socket: AsyncSocket)`` are returned.
result[0] = prc.copyNimTree()
result[0][3][0] = stripReturnType(result[0][3][0])
# Retrieve the `T` inside `Future[T]`.
let returnType = stripReturnType(result[0][3][0])
result[0][3][0] = splitParamType(returnType, async=false)
for i in 1 .. <result[0][3].len:
result[0][3][i] = splitParams(result[0][3][i], false)
# Sync proc (0) -> FormalParams (3) -> IdentDefs, the parameter (i) ->
# parameter type (1).
result[0][3][i][1] = splitParamType(result[0][3][i][1], async=false)
result[0][6] = stripAwait(result[0][6])
result[1] = prc.copyNimTree()
if result[1][3][0].kind == nnkBracketExpr:
result[1][3][0][1] = splitParamType(result[1][3][0][1], async=true)
for i in 1 .. <result[1][3].len:
result[1][3][i] = splitParams(result[1][3][i], true)
# Async proc (1) -> FormalParams (3) -> IdentDefs, the parameter (i) ->
# parameter type (1).
result[1][3][i][1] = splitParamType(result[1][3][i][1], async=true)
macro multisync*(prc: untyped): untyped =
## Macro which processes async procedures into both asynchronous and
@@ -512,4 +527,4 @@ macro multisync*(prc: untyped): untyped =
let (sync, asyncPrc) = splitProc(prc)
result = newStmtList()
result.add(asyncSingleProc(asyncPrc))
result.add(sync)
result.add(sync)

View File

@@ -129,7 +129,7 @@ proc expandIfNeeded[T](deq: var Deque[T]) =
var cap = deq.mask + 1
if unlikely(deq.count >= cap):
var n = newSeq[T](cap * 2)
for i, x in deq: # don't use copyMem because the GC and because it's slower.
for i, x in pairs(deq): # don't use copyMem because the GC and because it's slower.
shallowCopy(n[i], x)
shallowCopy(deq.data, n)
deq.mask = cap * 2 - 1

View File

@@ -144,7 +144,7 @@ proc add*[T](q: var Queue[T], item: T) =
var cap = q.mask+1
if unlikely(q.count >= cap):
var n = newSeq[T](cap*2)
for i, x in q: # don't use copyMem because the GC and because it's slower.
for i, x in pairs(q): # don't use copyMem because the GC and because it's slower.
shallowCopy(n[i], x)
shallowCopy(q.data, n)
q.mask = cap*2 - 1

View File

@@ -84,6 +84,9 @@
## .. code-block:: Nim
## client.onProgressChanged = nil
##
## **Warning:** The ``total`` reported by httpclient may be 0 in some cases.
##
##
## SSL/TLS support
## ===============
## This requires the OpenSSL library, fortunately it's widely used and installed
@@ -117,20 +120,28 @@
## only basic authentication is supported at the moment.
import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes,
math, random, httpcore, times, tables
import asyncnet, asyncdispatch
math, random, httpcore, times, tables, streams
import asyncnet, asyncdispatch, asyncfile
import nativesockets
export httpcore except parseHeader # TODO: The ``except`` doesn't work
type
Response* = object
Response* = ref object
version*: string
status*: string
headers*: HttpHeaders
body*: string
body: string
bodyStream*: Stream
proc code*(response: Response): HttpCode
AsyncResponse* = ref object
version*: string
status*: string
headers*: HttpHeaders
body: string
bodyStream*: FutureStream[string]
proc code*(response: Response | AsyncResponse): HttpCode
{.raises: [ValueError, OverflowError].} =
## Retrieves the specified response's ``HttpCode``.
##
@@ -138,6 +149,27 @@ proc code*(response: Response): HttpCode
## corresponding ``HttpCode``.
return response.status[0 .. 2].parseInt.HttpCode
proc body*(response: Response): string =
## Retrieves the specified response's body.
##
## The response's body stream is read synchronously.
if response.body.isNil():
response.body = response.bodyStream.readAll()
return response.body
proc `body=`*(response: Response, value: string) {.deprecated.} =
## Setter for backward compatibility.
##
## **This is deprecated and should not be used**.
response.body = value
proc body*(response: AsyncResponse): Future[string] {.async.} =
## Reads the response's body and caches it. The read is performed only
## once.
if response.body.isNil:
response.body = await readAll(response.bodyStream)
return response.body
type
Proxy* = ref object
url*: Uri
@@ -249,6 +281,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in
result.add(buf)
proc parseResponse(s: Socket, getBody: bool, timeout: int): Response =
new result
var parsedStatus = false
var linei = 0
var fullyRead = false
@@ -604,7 +637,7 @@ proc post*(url: string, extraHeaders = "", body = "",
## **Deprecated since version 0.15.0**: use ``HttpClient.post`` instead.
let (mpHeaders, mpBody) = format(multipart)
template withNewLine(x): expr =
template withNewLine(x): untyped =
if x.len > 0 and not x.endsWith("\c\L"):
x & "\c\L"
else:
@@ -653,10 +686,13 @@ proc postContent*(url: string, extraHeaders = "", body = "",
proc downloadFile*(url: string, outputFilename: string,
sslContext: SSLContext = defaultSSLContext,
timeout = -1, userAgent = defUserAgent,
proxy: Proxy = nil) =
proxy: Proxy = nil) {.deprecated.} =
## | Downloads ``url`` and saves it to ``outputFilename``
## | An optional timeout can be specified in milliseconds, if reading from the
## server takes longer than specified an ETimeout exception will be raised.
##
## **Deprecated since version 0.16.2**: use ``HttpClient.downloadFile``
## instead.
var f: File
if open(f, outputFilename, fmWrite):
f.write(getContent(url, sslContext = sslContext, timeout = timeout,
@@ -735,6 +771,11 @@ type
contentProgress: BiggestInt
oneSecondProgress: BiggestInt
lastProgressReport: float
when SocketType is AsyncSocket:
bodyStream: FutureStream[string]
else:
bodyStream: Stream
getBody: bool ## When `false`, the body is never read in requestAux.
type
HttpClient* = HttpClientBase[Socket]
@@ -764,6 +805,8 @@ proc newHttpClient*(userAgent = defUserAgent,
result.proxy = proxy
result.timeout = timeout
result.onProgressChanged = nil
result.bodyStream = newStringStream()
result.getBody = true
when defined(ssl):
result.sslContext = sslContext
@@ -794,6 +837,8 @@ proc newAsyncHttpClient*(userAgent = defUserAgent,
result.proxy = proxy
result.timeout = -1 # TODO
result.onProgressChanged = nil
result.bodyStream = newFutureStream[string]("newAsyncHttpClient")
result.getBody = true
when defined(ssl):
result.sslContext = sslContext
@@ -815,14 +860,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient,
client.oneSecondProgress = 0
client.lastProgressReport = epochTime()
proc recvFull(client: HttpClient | AsyncHttpClient,
size: int, timeout: int): Future[string] {.multisync.} =
proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int,
keep: bool): Future[int] {.multisync.} =
## Ensures that all the data requested is read and returned.
result = ""
var readLen = 0
while true:
if size == result.len: break
if size == readLen: break
let remainingSize = size - result.len
let remainingSize = size - readLen
let sizeToRecv = min(remainingSize, net.BufferSize)
when client.socket is Socket:
@@ -830,13 +875,17 @@ proc recvFull(client: HttpClient | AsyncHttpClient,
else:
let data = await client.socket.recv(sizeToRecv)
if data == "": break # We've been disconnected.
result.add data
readLen.inc(data.len)
if keep:
await client.bodyStream.write(data)
await reportProgress(client, data.len)
proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
return readLen
proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void]
{.multisync.} =
result = ""
while true:
var chunkSize = 0
var chunkSizeStr = await client.socket.recvLine()
@@ -861,25 +910,27 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
httpError("Invalid chunk size: " & chunkSizeStr)
inc(i)
if chunkSize <= 0:
discard await recvFull(client, 2, client.timeout) # Skip \c\L
discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
break
result.add await recvFull(client, chunkSize, client.timeout)
discard await recvFull(client, 2, client.timeout) # Skip \c\L
discard await recvFull(client, chunkSize, client.timeout, true)
discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
# Trailer headers will only be sent if the request specifies that we want
# them: http://tools.ietf.org/html/rfc2616#section-3.6.1
proc parseBody(client: HttpClient | AsyncHttpClient,
headers: HttpHeaders,
httpVersion: string): Future[string] {.multisync.} =
result = ""
httpVersion: string): Future[void] {.multisync.} =
# Reset progress from previous requests.
client.contentTotal = 0
client.contentProgress = 0
client.oneSecondProgress = 0
client.lastProgressReport = 0
when client is AsyncHttpClient:
assert(not client.bodyStream.finished)
if headers.getOrDefault"Transfer-Encoding" == "chunked":
result = await parseChunks(client)
await parseChunks(client)
else:
# -REGION- Content-Length
# (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3
@@ -888,26 +939,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient,
var length = contentLengthHeader.parseint()
client.contentTotal = length
if length > 0:
result = await client.recvFull(length, client.timeout)
if result == "":
let recvLen = await client.recvFull(length, client.timeout, true)
if recvLen == 0:
httpError("Got disconnected while trying to read body.")
if result.len != length:
if recvLen != length:
httpError("Received length doesn't match expected length. Wanted " &
$length & " got " & $result.len)
$length & " got " & $recvLen)
else:
# (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO
# -REGION- Connection: Close
# (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5
if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0":
var buf = ""
while true:
buf = await client.recvFull(4000, client.timeout)
if buf == "": break
result.add(buf)
let recvLen = await client.recvFull(4000, client.timeout, true)
if recvLen == 0: break
when client is AsyncHttpClient:
client.bodyStream.complete()
else:
client.bodyStream.setPosition(0)
proc parseResponse(client: HttpClient | AsyncHttpClient,
getBody: bool): Future[Response] {.multisync.} =
getBody: bool): Future[Response | AsyncResponse]
{.multisync.} =
new result
var parsedStatus = false
var linei = 0
var fullyRead = false
@@ -955,10 +1011,14 @@ proc parseResponse(client: HttpClient | AsyncHttpClient,
if not fullyRead:
httpError("Connection was closed before full request has been made")
if getBody:
result.body = await parseBody(client, result.headers, result.version)
else:
result.body = ""
when client is HttpClient:
client.bodyStream = newStringStream()
else:
client.bodyStream = newFutureStream[string]("parseResponse")
await parseBody(client, result.headers, result.version)
result.bodyStream = client.bodyStream
proc newConnection(client: HttpClient | AsyncHttpClient,
url: Uri) {.multisync.} =
@@ -1006,8 +1066,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders =
result[k] = vs
proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
httpMethod: string, body = "",
headers: HttpHeaders = nil): Future[Response] {.multisync.} =
httpMethod: string, body = "",
headers: HttpHeaders = nil): Future[Response | AsyncResponse]
{.multisync.} =
# Helper that actually makes the request. Does not handle redirects.
let connectionUrl =
if client.proxy.isNil: parseUri(url) else: client.proxy.url
@@ -1047,16 +1108,17 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
if body != "":
await client.socket.send(body)
result = await parseResponse(client,
httpMethod.toLower() notin ["head", "connect"])
let getBody = httpMethod.toLowerAscii() notin ["head", "connect"] and
client.getBody
result = await parseResponse(client, getBody)
# Restore the clients proxy in case it was overwritten.
client.proxy = savedProxy
proc request*(client: HttpClient | AsyncHttpClient, url: string,
httpMethod: string, body = "",
headers: HttpHeaders = nil): Future[Response] {.multisync.} =
headers: HttpHeaders = nil): Future[Response | AsyncResponse]
{.multisync.} =
## Connects to the hostname specified by the URL and performs a request
## using the custom method string specified by ``httpMethod``.
##
@@ -1078,7 +1140,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
proc request*(client: HttpClient | AsyncHttpClient, url: string,
httpMethod = HttpGET, body = "",
headers: HttpHeaders = nil): Future[Response] {.multisync.} =
headers: HttpHeaders = nil): Future[Response | AsyncResponse]
{.multisync.} =
## Connects to the hostname specified by the URL and performs a request
## using the method specified.
##
@@ -1088,11 +1151,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
##
## When a request is made to a different hostname, the current connection will
## be closed.
result = await request(client, url, $httpMethod, body,
headers = headers)
result = await request(client, url, $httpMethod, body, headers)
proc get*(client: HttpClient | AsyncHttpClient,
url: string): Future[Response] {.multisync.} =
url: string): Future[Response | AsyncResponse] {.multisync.} =
## Connects to the hostname specified by the URL and performs a GET request.
##
## This procedure will follow redirects up to a maximum number of redirects
@@ -1112,17 +1174,18 @@ proc getContent*(client: HttpClient | AsyncHttpClient,
if resp.code.is4xx or resp.code.is5xx:
raise newException(HttpRequestError, resp.status)
else:
return resp.body
return await resp.bodyStream.readAll()
proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "",
multipart: MultipartData = nil): Future[Response] {.multisync.} =
multipart: MultipartData = nil): Future[Response | AsyncResponse]
{.multisync.} =
## Connects to the hostname specified by the URL and performs a POST request.
##
## This procedure will follow redirects up to a maximum number of redirects
## specified in ``client.maxRedirects``.
let (mpHeader, mpBody) = format(multipart)
template withNewLine(x): expr =
# TODO: Support FutureStream for `body` parameter.
template withNewLine(x): untyped =
if x.len > 0 and not x.endsWith("\c\L"):
x & "\c\L"
else:
@@ -1134,16 +1197,14 @@ proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "",
headers["Content-Type"] = mpHeader.split(": ")[1]
headers["Content-Length"] = $len(xb)
result = await client.requestAux(url, $HttpPOST, xb,
headers = headers)
result = await client.requestAux(url, $HttpPOST, xb, headers)
# Handle redirects.
var lastURL = url
for i in 1..client.maxRedirects:
if result.status.redirection():
let redirectTo = getNewLocation(lastURL, result.headers)
var meth = if result.status != "307": HttpGet else: HttpPost
result = await client.requestAux(redirectTo, $meth, xb,
headers = headers)
result = await client.requestAux(redirectTo, $meth, xb, headers)
lastURL = redirectTo
proc postContent*(client: HttpClient | AsyncHttpClient, url: string,
@@ -1161,4 +1222,30 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string,
if resp.code.is4xx or resp.code.is5xx:
raise newException(HttpRequestError, resp.status)
else:
return resp.body
return await resp.bodyStream.readAll()
proc downloadFile*(client: HttpClient | AsyncHttpClient,
url: string, filename: string): Future[void] {.multisync.} =
## Downloads ``url`` and saves it to ``filename``.
client.getBody = false
let resp = await client.get(url)
when client is HttpClient:
client.bodyStream = newFileStream(filename, fmWrite)
if client.bodyStream.isNil:
fileError("Unable to open file")
parseBody(client, resp.headers, resp.version)
client.bodyStream.close()
else:
client.bodyStream = newFutureStream[string]("downloadFile")
var file = openAsync(filename, fmWrite)
# Let `parseBody` write response data into client.bodyStream in the
# background.
asyncCheck parseBody(client, resp.headers, resp.version)
# The `writeFromStream` proc will complete once all the data in the
# `bodyStream` has been written to the file.
await file.writeFromStream(client.bodyStream)
file.close()
if resp.code.is4xx or resp.code.is5xx:
raise newException(HttpRequestError, resp.status)

View File

@@ -16,6 +16,12 @@ type
FutureVar*[T] = distinct Future[T]
FutureStream*[T] = ref object of FutureBase ## Special future that acts as
## a queue. Its API is still
## experimental and so is
## subject to change.
queue: Deque[T]
FutureError* = object of Exception
cause*: FutureBase
@@ -26,11 +32,7 @@ when not defined(release):
proc callSoon*(cbproc: proc ()) {.gcsafe.}
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
template setupFutureBase(fromProc: string) =
new(result)
result.finished = false
when not defined(release):
@@ -39,6 +41,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
result.fromProc = fromProc
currentID.inc()
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
setupFutureBase(fromProc)
proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## Create a new ``FutureVar``. This Future type is ideally suited for
## situations where you want to avoid unnecessary allocations of Futures.
@@ -47,6 +56,22 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new ``FutureStream``. This future's callback is activated when
## two events occur:
##
## * New data is written into the future stream.
## * The future stream is completed (this means that no more data will be
## written).
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
##
## **Note:** The API of FutureStream is still new and so has a higher
## likelihood of changing in the future.
setupFutureBase(fromProc)
result.queue = initDeque[T]()
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false
@@ -107,12 +132,18 @@ proc complete*[T](future: FutureVar[T], val: T) =
## Any previously stored value will be overwritten.
template fut: untyped = Future[T](future)
checkFinished(fut)
assert(fut.error == nil)
assert(fut.error.isNil())
fut.finished = true
fut.value = val
if fut.cb != nil:
if not fut.cb.isNil():
fut.cb()
proc complete*[T](future: FutureStream[T]) =
## Completes a ``FutureStream`` signalling the end of data.
future.finished = true
if not future.cb.isNil():
future.cb()
proc fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -149,6 +180,20 @@ proc `callback=`*[T](future: Future[T],
## If future has already completed then ``cb`` will be called immediately.
future.callback = proc () = cb(future)
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
## future stream.
##
## The callback is also called when the future is completed. So you should
## use ``finished`` to check whether data is available.
##
## If the future stream already has data or is finished then ``cb`` will be
## called immediately.
future.cb = proc () = cb(future)
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc injectStacktrace[T](future: Future[T]) =
# TODO: Come up with something better.
when not defined(release):
@@ -195,12 +240,18 @@ proc mget*[T](future: FutureVar[T]): var T =
## Future has not been finished.
result = Future[T](future).value
proc finished*[T](future: Future[T] | FutureVar[T]): bool =
proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
##
## For a ``FutureStream`` a ``true`` value means that no more data will be
## placed inside the stream _and_ that there is no data waiting to be
## retrieved.
when future is FutureVar[T]:
result = (Future[T](future)).finished
elif future is FutureStream[T]:
result = future.finished and future.queue.len == 0
else:
result = future.finished
@@ -208,6 +259,57 @@ proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise ``ValueError`` if ``future`` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Implement limiting of the streams storage to prevent it growing
# infinitely when no reads are occuring.
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
## Returns a future that will complete when the ``FutureStream`` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, ``false`` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## ``FutureStream``.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
future.callback =
proc (fs: FutureStream[T]) =
# We don't want this callback called again.
future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
if not resFut.finished:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil: savedCb()
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len
proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.

View File

@@ -9,7 +9,7 @@
include "system/inclrtl"
import os, oids, tables, strutils, times, heapqueue, lists
import os, oids, tables, strutils, times, heapqueue, lists, queues
import nativesockets, net, deques
@@ -1664,6 +1664,17 @@ proc accept*(socket: AsyncFD,
# -- Await Macro
include asyncmacro
proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
## Returns a future that will complete when all the string data from the
## specified future stream is retrieved.
result = ""
while true:
let (hasValue, value) = await future.read()
if hasValue:
result.add(value)
else:
break
proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.

View File

@@ -0,0 +1,53 @@
discard """
file: "tfuturestream.nim"
exitcode: 0
output: '''
0
1
2
3
4
5
Done
Finished
'''
"""
import asyncdispatch
var fs = newFutureStream[int]()
proc alpha() {.async.} =
for i in 0 .. 5:
await sleepAsync(1000)
await fs.write(i)
echo("Done")
fs.complete()
proc beta() {.async.} =
while not fs.finished:
let (hasValue, value) = await fs.read()
if hasValue:
echo(value)
echo("Finished")
asyncCheck alpha()
waitFor beta()
# TODO: Something like this should work eventually.
# proc delta(): FutureStream[string] {.async.} =
# for i in 0 .. 5:
# await sleepAsync(1000)
# result.put($i)
# return ""
# proc omega() {.async.} =
# let fut = delta()
# while not fut.finished():
# echo(await fs.takeAsync())
# echo("Finished")
# waitFor omega()

View File

@@ -13,7 +13,9 @@ proc asyncTest() {.async.} =
var client = newAsyncHttpClient()
var resp = await client.request("http://example.com/")
doAssert(resp.code.is2xx)
doAssert("<title>Example Domain</title>" in resp.body)
var body = await resp.body
body = await resp.body # Test caching
doAssert("<title>Example Domain</title>" in body)
resp = await client.request("http://example.com/404")
doAssert(resp.code.is4xx)
@@ -47,7 +49,8 @@ proc asyncTest() {.async.} =
echo("Downloaded ", progress, " of ", total)
echo("Current rate: ", speed div 1000, "kb/s")
client.onProgressChanged = onProgressChanged
discard await client.getContent("http://speedtest-ams2.digitalocean.com/100mb.test")
await client.downloadFile("http://speedtest-ams2.digitalocean.com/100mb.test",
"100mb.test")
client.close()
@@ -94,7 +97,8 @@ proc syncTest() =
echo("Downloaded ", progress, " of ", total)
echo("Current rate: ", speed div 1000, "kb/s")
client.onProgressChanged = onProgressChanged
discard client.getContent("http://speedtest-ams2.digitalocean.com/100mb.test")
client.downloadFile("http://speedtest-ams2.digitalocean.com/100mb.test",
"100mb.test")
client.close()

View File

@@ -36,10 +36,7 @@ proc download(pkg: string; c: Controls) {.async.} =
c.bar.value = clamp(int(progress*100 div total), 0, 100)
client.onProgressChanged = onProgressChanged
# XXX give a destination filename instead
let contents = await client.getContent("https://nim-lang.org/download/" & pkg & ".7z")
# XXX make this async somehow:
writeFile(z, contents)
await client.downloadFile("https://nim-lang.org/download/" & pkg & ".7z", z)
c.bar.value = 100
let p = osproc.startProcess("7zG.exe", getCurrentDir() / r"..\dist",
["x", pkg & ".7z"])