Introduce FutureVar[T] to make recvLineInto safer.

FutureVar[T] is a new distinct Future type which is designed to be used
for situations where the highest performance is needed. It reduces the
number of Future allocations needed. It acts as a replacement for
'var' params in async procs.

This commit modifies @def-'s PR in order to make it safer. The recvLineInto
procedure has been modified to take a ``FutureVar[string]`` param instead of a
``ptr string`` param.
This commit is contained in:
Dominik Picheta
2015-04-24 17:56:04 +01:00
parent 62e1b3e2e3
commit 72b4912c84
3 changed files with 74 additions and 20 deletions

View File

@@ -145,6 +145,8 @@ type
Future*[T] = ref object of FutureBase ## Typed future.
value: T ## Stored value
FutureVar*[T] = distinct Future[T]
{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
@@ -162,6 +164,19 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
result.fromProc = fromProc
currentID.inc()
proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## Create a new ``FutureVar``. This Future type is ideally suited for
## situations where you want to avoid unnecessary allocations of Futures.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false
Future[T](future).error = nil
proc checkFinished[T](future: Future[T]) =
when not defined(release):
if future.finished:
@@ -194,6 +209,15 @@ proc complete*(future: Future[void]) =
if future.cb != nil:
future.cb()
proc complete*[T](future: FutureVar[T]) =
## Completes a ``FutureVar``.
template fut: expr = Future[T](future)
checkFinished(fut)
assert(fut.error == nil)
fut.finished = true
if fut.cb != nil:
fut.cb()
proc fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -264,6 +288,13 @@ proc readError*[T](future: Future[T]): ref Exception =
else:
raise newException(ValueError, "No error in future.")
proc mget*[T](future: FutureVar[T]): var T =
## Returns a mutable value stored in ``future``.
##
## Unlike ``read``, this function will not raise an exception if the
## Future has not been finished.
result = Future[T](future).value
proc finished*[T](future: Future[T]): bool =
## Determines whether ``future`` has completed.
##

View File

@@ -148,7 +148,8 @@ proc processClient(client: AsyncSocket, address: string,
var request: Request
request.url = initUri()
request.headers = newStringTable(modeCaseInsensitive)
var line = newStringOfCap(80)
var lineFut = newFutureVar[string]("asynchttpserver.processClient")
lineFut.mget() = newStringOfCap(80)
var key, value = ""
while not client.isClosed:
@@ -161,14 +162,15 @@ proc processClient(client: AsyncSocket, address: string,
request.client = client
# First line - GET /path HTTP/1.1
line.setLen(0)
await client.recvLineInto(addr line) # TODO: Timeouts.
if line == "":
lineFut.mget().setLen(0)
lineFut.clean()
await client.recvLineInto(lineFut) # TODO: Timeouts.
if lineFut.mget == "":
client.close()
return
var i = 0
for linePart in line.split(' '):
for linePart in lineFut.mget.split(' '):
case i
of 0: request.reqMethod.shallowCopy(linePart.normalize)
of 1: parseUri(linePart, request.url)
@@ -180,20 +182,21 @@ proc processClient(client: AsyncSocket, address: string,
"Invalid request protocol. Got: " & linePart)
continue
else:
await request.respond(Http400, "Invalid request. Got: " & line)
await request.respond(Http400, "Invalid request. Got: " & lineFut.mget)
continue
inc i
# Headers
while true:
i = 0
line.setLen(0)
await client.recvLineInto(addr line)
lineFut.mget.setLen(0)
lineFut.clean()
await client.recvLineInto(lineFut)
if line == "":
if lineFut.mget == "":
client.close(); return
if line == "\c\L": break
let (key, value) = parseHeader(line)
if lineFut.mget == "\c\L": break
let (key, value) = parseHeader(lineFut.mget)
request.headers[key] = value
if request.reqMethod == "post":

View File

@@ -419,10 +419,13 @@ proc accept*(socket: AsyncSocket,
retFut.complete(future.read.client)
return retFut
proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
flags = {SocketFlag.SafeDisconn}) {.async.} =
## Reads a line of data from ``socket`` into ``resString``.
##
## The ``resString`` future and the string value contained within must both
## be initialised.
##
## If a full line is read ``\r\L`` is not
## added to ``line``, however if solely ``\r\L`` is read then ``line``
## will be set to it.
@@ -438,16 +441,23 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
## protocol uses ``\r\L`` to delimit a new line.
assert SocketFlag.Peek notin flags ## TODO:
assert(not resString.mget.isNil(),
"String inside resString future needs to be initialised")
result = newFuture[void]("asyncnet.recvLineInto")
# TODO: Make the async transformation check for FutureVar params and complete
# them when the result future is completed.
# Can we replace the result future with the FutureVar?
template addNLIfEmpty(): stmt =
if resString[].len == 0:
resString[].add("\c\L")
if resString.mget.len == 0:
resString.mget.add("\c\L")
if socket.isBuffered:
if socket.bufLen == 0:
let res = socket.readIntoBuf(flags)
if res == 0:
resString.complete()
return
var lastR = false
@@ -455,7 +465,8 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
if socket.currPos >= socket.bufLen:
let res = socket.readIntoBuf(flags)
if res == 0:
resString[].setLen(0)
resString.mget().setLen(0)
resString.complete()
return
case socket.buffer[socket.currPos]
@@ -465,13 +476,15 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
of '\L':
addNLIfEmpty()
socket.currPos.inc()
resString.complete()
return
else:
if lastR:
socket.currPos.inc()
resString.complete()
return
else:
resString[].add socket.buffer[socket.currPos]
resString.mget.add socket.buffer[socket.currPos]
socket.currPos.inc()
else:
var c = ""
@@ -479,18 +492,23 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
let recvFut = recv(socket, 1, flags)
c = recvFut.read()
if c.len == 0:
resString[].setLen(0)
resString.mget.setLen(0)
resString.complete()
return
if c == "\r":
let recvFut = recv(socket, 1, flags) # Skip \L
c = recvFut.read()
assert c == "\L"
addNLIfEmpty()
resString.complete()
return
elif c == "\L":
addNLIfEmpty()
resString.complete()
return
resString[].add c
resString.mget.add c
resString.complete()
proc recvLine*(socket: AsyncSocket,
flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
@@ -516,8 +534,10 @@ proc recvLine*(socket: AsyncSocket,
result.add("\c\L")
assert SocketFlag.Peek notin flags ## TODO:
result = ""
await socket.recvLineInto(addr result, flags)
# TODO: Optimise this.
var resString = newFutureVar[string]("asyncnet.recvLine")
await socket.recvLineInto(resString, flags)
result = resString.mget()
proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
## Marks ``socket`` as accepting connections.