mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-29 17:34:43 +00:00
Introduce FutureVar[T] to make recvLineInto safer.
FutureVar[T] is a new distinct Future type which is designed to be used for situations where the highest performance is needed. It reduces the number of Future allocations needed. It acts as a replacement for 'var' params in async procs. This commit modifies @def-'s PR in order to make it safer. The recvLineInto procedure has been modified to take a ``FutureVar[string]`` param instead of a ``ptr string`` param.
This commit is contained in:
@@ -145,6 +145,8 @@ type
|
||||
Future*[T] = ref object of FutureBase ## Typed future.
|
||||
value: T ## Stored value
|
||||
|
||||
FutureVar*[T] = distinct Future[T]
|
||||
|
||||
{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
|
||||
|
||||
|
||||
@@ -162,6 +164,19 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
|
||||
result.fromProc = fromProc
|
||||
currentID.inc()
|
||||
|
||||
proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
|
||||
## Create a new ``FutureVar``. This Future type is ideally suited for
|
||||
## situations where you want to avoid unnecessary allocations of Futures.
|
||||
##
|
||||
## Specifying ``fromProc``, which is a string specifying the name of the proc
|
||||
## that this future belongs to, is a good habit as it helps with debugging.
|
||||
result = FutureVar[T](newFuture[T](fromProc))
|
||||
|
||||
proc clean*[T](future: FutureVar[T]) =
|
||||
## Resets the ``finished`` status of ``future``.
|
||||
Future[T](future).finished = false
|
||||
Future[T](future).error = nil
|
||||
|
||||
proc checkFinished[T](future: Future[T]) =
|
||||
when not defined(release):
|
||||
if future.finished:
|
||||
@@ -194,6 +209,15 @@ proc complete*(future: Future[void]) =
|
||||
if future.cb != nil:
|
||||
future.cb()
|
||||
|
||||
proc complete*[T](future: FutureVar[T]) =
|
||||
## Completes a ``FutureVar``.
|
||||
template fut: expr = Future[T](future)
|
||||
checkFinished(fut)
|
||||
assert(fut.error == nil)
|
||||
fut.finished = true
|
||||
if fut.cb != nil:
|
||||
fut.cb()
|
||||
|
||||
proc fail*[T](future: Future[T], error: ref Exception) =
|
||||
## Completes ``future`` with ``error``.
|
||||
#assert(not future.finished, "Future already finished, cannot finish twice.")
|
||||
@@ -264,6 +288,13 @@ proc readError*[T](future: Future[T]): ref Exception =
|
||||
else:
|
||||
raise newException(ValueError, "No error in future.")
|
||||
|
||||
proc mget*[T](future: FutureVar[T]): var T =
|
||||
## Returns a mutable value stored in ``future``.
|
||||
##
|
||||
## Unlike ``read``, this function will not raise an exception if the
|
||||
## Future has not been finished.
|
||||
result = Future[T](future).value
|
||||
|
||||
proc finished*[T](future: Future[T]): bool =
|
||||
## Determines whether ``future`` has completed.
|
||||
##
|
||||
|
||||
@@ -148,7 +148,8 @@ proc processClient(client: AsyncSocket, address: string,
|
||||
var request: Request
|
||||
request.url = initUri()
|
||||
request.headers = newStringTable(modeCaseInsensitive)
|
||||
var line = newStringOfCap(80)
|
||||
var lineFut = newFutureVar[string]("asynchttpserver.processClient")
|
||||
lineFut.mget() = newStringOfCap(80)
|
||||
var key, value = ""
|
||||
|
||||
while not client.isClosed:
|
||||
@@ -161,14 +162,15 @@ proc processClient(client: AsyncSocket, address: string,
|
||||
request.client = client
|
||||
|
||||
# First line - GET /path HTTP/1.1
|
||||
line.setLen(0)
|
||||
await client.recvLineInto(addr line) # TODO: Timeouts.
|
||||
if line == "":
|
||||
lineFut.mget().setLen(0)
|
||||
lineFut.clean()
|
||||
await client.recvLineInto(lineFut) # TODO: Timeouts.
|
||||
if lineFut.mget == "":
|
||||
client.close()
|
||||
return
|
||||
|
||||
var i = 0
|
||||
for linePart in line.split(' '):
|
||||
for linePart in lineFut.mget.split(' '):
|
||||
case i
|
||||
of 0: request.reqMethod.shallowCopy(linePart.normalize)
|
||||
of 1: parseUri(linePart, request.url)
|
||||
@@ -180,20 +182,21 @@ proc processClient(client: AsyncSocket, address: string,
|
||||
"Invalid request protocol. Got: " & linePart)
|
||||
continue
|
||||
else:
|
||||
await request.respond(Http400, "Invalid request. Got: " & line)
|
||||
await request.respond(Http400, "Invalid request. Got: " & lineFut.mget)
|
||||
continue
|
||||
inc i
|
||||
|
||||
# Headers
|
||||
while true:
|
||||
i = 0
|
||||
line.setLen(0)
|
||||
await client.recvLineInto(addr line)
|
||||
lineFut.mget.setLen(0)
|
||||
lineFut.clean()
|
||||
await client.recvLineInto(lineFut)
|
||||
|
||||
if line == "":
|
||||
if lineFut.mget == "":
|
||||
client.close(); return
|
||||
if line == "\c\L": break
|
||||
let (key, value) = parseHeader(line)
|
||||
if lineFut.mget == "\c\L": break
|
||||
let (key, value) = parseHeader(lineFut.mget)
|
||||
request.headers[key] = value
|
||||
|
||||
if request.reqMethod == "post":
|
||||
|
||||
@@ -419,10 +419,13 @@ proc accept*(socket: AsyncSocket,
|
||||
retFut.complete(future.read.client)
|
||||
return retFut
|
||||
|
||||
proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
|
||||
proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
|
||||
flags = {SocketFlag.SafeDisconn}) {.async.} =
|
||||
## Reads a line of data from ``socket`` into ``resString``.
|
||||
##
|
||||
## The ``resString`` future and the string value contained within must both
|
||||
## be initialised.
|
||||
##
|
||||
## If a full line is read ``\r\L`` is not
|
||||
## added to ``line``, however if solely ``\r\L`` is read then ``line``
|
||||
## will be set to it.
|
||||
@@ -438,16 +441,23 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
|
||||
## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
|
||||
## protocol uses ``\r\L`` to delimit a new line.
|
||||
assert SocketFlag.Peek notin flags ## TODO:
|
||||
assert(not resString.mget.isNil(),
|
||||
"String inside resString future needs to be initialised")
|
||||
result = newFuture[void]("asyncnet.recvLineInto")
|
||||
|
||||
# TODO: Make the async transformation check for FutureVar params and complete
|
||||
# them when the result future is completed.
|
||||
# Can we replace the result future with the FutureVar?
|
||||
|
||||
template addNLIfEmpty(): stmt =
|
||||
if resString[].len == 0:
|
||||
resString[].add("\c\L")
|
||||
if resString.mget.len == 0:
|
||||
resString.mget.add("\c\L")
|
||||
|
||||
if socket.isBuffered:
|
||||
if socket.bufLen == 0:
|
||||
let res = socket.readIntoBuf(flags)
|
||||
if res == 0:
|
||||
resString.complete()
|
||||
return
|
||||
|
||||
var lastR = false
|
||||
@@ -455,7 +465,8 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
|
||||
if socket.currPos >= socket.bufLen:
|
||||
let res = socket.readIntoBuf(flags)
|
||||
if res == 0:
|
||||
resString[].setLen(0)
|
||||
resString.mget().setLen(0)
|
||||
resString.complete()
|
||||
return
|
||||
|
||||
case socket.buffer[socket.currPos]
|
||||
@@ -465,13 +476,15 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
|
||||
of '\L':
|
||||
addNLIfEmpty()
|
||||
socket.currPos.inc()
|
||||
resString.complete()
|
||||
return
|
||||
else:
|
||||
if lastR:
|
||||
socket.currPos.inc()
|
||||
resString.complete()
|
||||
return
|
||||
else:
|
||||
resString[].add socket.buffer[socket.currPos]
|
||||
resString.mget.add socket.buffer[socket.currPos]
|
||||
socket.currPos.inc()
|
||||
else:
|
||||
var c = ""
|
||||
@@ -479,18 +492,23 @@ proc recvLineInto*(socket: AsyncSocket, resString: ptr string,
|
||||
let recvFut = recv(socket, 1, flags)
|
||||
c = recvFut.read()
|
||||
if c.len == 0:
|
||||
resString[].setLen(0)
|
||||
resString.mget.setLen(0)
|
||||
resString.complete()
|
||||
return
|
||||
if c == "\r":
|
||||
let recvFut = recv(socket, 1, flags) # Skip \L
|
||||
c = recvFut.read()
|
||||
assert c == "\L"
|
||||
addNLIfEmpty()
|
||||
resString.complete()
|
||||
return
|
||||
elif c == "\L":
|
||||
addNLIfEmpty()
|
||||
resString.complete()
|
||||
return
|
||||
resString[].add c
|
||||
resString.mget.add c
|
||||
|
||||
resString.complete()
|
||||
|
||||
proc recvLine*(socket: AsyncSocket,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
|
||||
@@ -516,8 +534,10 @@ proc recvLine*(socket: AsyncSocket,
|
||||
result.add("\c\L")
|
||||
assert SocketFlag.Peek notin flags ## TODO:
|
||||
|
||||
result = ""
|
||||
await socket.recvLineInto(addr result, flags)
|
||||
# TODO: Optimise this.
|
||||
var resString = newFutureVar[string]("asyncnet.recvLine")
|
||||
await socket.recvLineInto(resString, flags)
|
||||
result = resString.mget()
|
||||
|
||||
proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
|
||||
## Marks ``socket`` as accepting connections.
|
||||
|
||||
Reference in New Issue
Block a user