mirror of
https://github.com/nim-lang/Nim.git
synced 2026-02-15 07:43:26 +00:00
Make asyncnet usable when avoiding allocations.
- readInto, readIntoBuf, are templates instead of procs now - New recvLineInto template that reads directly into a string instead of creating a new one. Used by recvLine proc now - Need fd and bufLen fields of AsyncSocketDesc exported because of the templates - recv returns a shallow string to prevent copying - This gives significant speedups, mostly by using templates instead of creating new Futures and waiting for them all the time.
This commit is contained in:
@@ -69,13 +69,13 @@ type
|
||||
# TODO: I would prefer to just do:
|
||||
# AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
|
||||
AsyncSocketDesc = object
|
||||
fd: SocketHandle
|
||||
fd*: SocketHandle
|
||||
closed: bool ## determines whether this socket has been closed
|
||||
case isBuffered: bool ## determines whether this socket is buffered.
|
||||
of true:
|
||||
buffer: array[0..BufferSize, char]
|
||||
currPos: int # current index in buffer
|
||||
bufLen: int # current length of buffer
|
||||
bufLen*: int # current length of buffer
|
||||
of false: nil
|
||||
case isSsl: bool
|
||||
of true:
|
||||
@@ -182,26 +182,30 @@ proc connect*(socket: AsyncSocket, address: string, port: Port,
|
||||
sslSetConnectState(socket.sslHandle)
|
||||
sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
|
||||
|
||||
proc readInto(buf: cstring, size: int, socket: AsyncSocket,
|
||||
flags: set[SocketFlag]): Future[int] {.async.} =
|
||||
template readInto*(buf: cstring, size: int, socket: AsyncSocket,
|
||||
flags: set[SocketFlag]): int =
|
||||
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
|
||||
## this is a template and not a proc.
|
||||
var res = 0
|
||||
if socket.isSsl:
|
||||
when defined(ssl):
|
||||
# SSL mode.
|
||||
sslLoop(socket, flags,
|
||||
sslRead(socket.sslHandle, buf, size.cint))
|
||||
result = opResult
|
||||
res = opResult
|
||||
else:
|
||||
var data = await recv(socket.fd.TAsyncFD, size, flags)
|
||||
if data.len != 0:
|
||||
copyMem(buf, addr data[0], data.len)
|
||||
var recvIntoFut = recvInto(socket.fd.TAsyncFD, buf, size, flags)
|
||||
yield recvIntoFut
|
||||
# Not in SSL mode.
|
||||
result = data.len
|
||||
res = recvIntoFut.read()
|
||||
res
|
||||
|
||||
proc readIntoBuf(socket: AsyncSocket,
|
||||
flags: set[SocketFlag]): Future[int] {.async.} =
|
||||
result = await readInto(addr socket.buffer[0], BufferSize, socket, flags)
|
||||
template readIntoBuf(socket: AsyncSocket,
|
||||
flags: set[SocketFlag]): int =
|
||||
var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
|
||||
socket.currPos = 0
|
||||
socket.bufLen = result
|
||||
socket.bufLen = size
|
||||
size
|
||||
|
||||
proc recv*(socket: AsyncSocket, size: int,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
|
||||
@@ -222,10 +226,11 @@ proc recv*(socket: AsyncSocket, size: int,
|
||||
## to be read then the future will complete with a value of ``""``.
|
||||
if socket.isBuffered:
|
||||
result = newString(size)
|
||||
shallow(result)
|
||||
let originalBufPos = socket.currPos
|
||||
|
||||
if socket.bufLen == 0:
|
||||
let res = await socket.readIntoBuf(flags - {SocketFlag.Peek})
|
||||
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
|
||||
if res == 0:
|
||||
result.setLen(0)
|
||||
return
|
||||
@@ -236,7 +241,7 @@ proc recv*(socket: AsyncSocket, size: int,
|
||||
if SocketFlag.Peek in flags:
|
||||
# We don't want to get another buffer if we're peeking.
|
||||
break
|
||||
let res = await socket.readIntoBuf(flags - {SocketFlag.Peek})
|
||||
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
|
||||
if res == 0:
|
||||
break
|
||||
|
||||
@@ -251,7 +256,7 @@ proc recv*(socket: AsyncSocket, size: int,
|
||||
result.setLen(read)
|
||||
else:
|
||||
result = newString(size)
|
||||
let read = await readInto(addr result[0], size, socket, flags)
|
||||
let read = readInto(addr result[0], size, socket, flags)
|
||||
result.setLen(read)
|
||||
|
||||
proc send*(socket: AsyncSocket, data: string,
|
||||
@@ -302,6 +307,81 @@ proc accept*(socket: AsyncSocket,
|
||||
retFut.complete(future.read.client)
|
||||
return retFut
|
||||
|
||||
template recvLineInto*(socket: AsyncSocket, resString: var string,
|
||||
flags = {SocketFlag.SafeDisconn}) =
|
||||
## Reads a line of data from ``socket`` into ``resString``.
|
||||
##
|
||||
## If a full line is read ``\r\L`` is not
|
||||
## added to ``line``, however if solely ``\r\L`` is read then ``line``
|
||||
## will be set to it.
|
||||
##
|
||||
## If the socket is disconnected, ``line`` will be set to ``""``.
|
||||
##
|
||||
## If the socket is disconnected in the middle of a line (before ``\r\L``
|
||||
## is read) then line will be set to ``""``.
|
||||
## The partial line **will be lost**.
|
||||
##
|
||||
## **Warning**: The ``Peek`` flag is not yet implemented.
|
||||
##
|
||||
## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
|
||||
## protocol uses ``\r\L`` to delimit a new line.
|
||||
assert SocketFlag.Peek notin flags ## TODO:
|
||||
|
||||
template addNLIfEmpty(): stmt =
|
||||
if resString.len == 0:
|
||||
resString.add("\c\L")
|
||||
|
||||
block recvLineInto:
|
||||
if socket.isBuffered:
|
||||
if socket.bufLen == 0:
|
||||
let res = socket.readIntoBuf(flags)
|
||||
if res == 0:
|
||||
break recvLineInto
|
||||
|
||||
var lastR = false
|
||||
while true:
|
||||
if socket.currPos >= socket.bufLen:
|
||||
let res = socket.readIntoBuf(flags)
|
||||
if res == 0:
|
||||
resString.setLen(0)
|
||||
break recvLineInto
|
||||
|
||||
case socket.buffer[socket.currPos]
|
||||
of '\r':
|
||||
lastR = true
|
||||
addNLIfEmpty()
|
||||
of '\L':
|
||||
addNLIfEmpty()
|
||||
socket.currPos.inc()
|
||||
break recvLineInto
|
||||
else:
|
||||
if lastR:
|
||||
socket.currPos.inc()
|
||||
break recvLineInto
|
||||
else:
|
||||
resString.add socket.buffer[socket.currPos]
|
||||
socket.currPos.inc()
|
||||
else:
|
||||
var c = ""
|
||||
while true:
|
||||
let recvFut = recv(socket, 1, flags)
|
||||
yield recvFut
|
||||
c = recvFut.read()
|
||||
if c.len == 0:
|
||||
resString.setLen(0)
|
||||
break recvLineInto
|
||||
if c == "\r":
|
||||
let recvFut = recv(socket, 1, flags) # Skip \L
|
||||
yield recvFut
|
||||
c = recvFut.read()
|
||||
assert c == "\L"
|
||||
addNLIfEmpty()
|
||||
break recvLineInto
|
||||
elif c == "\L":
|
||||
addNLIfEmpty()
|
||||
break recvLineInto
|
||||
add(resString, c)
|
||||
|
||||
proc recvLine*(socket: AsyncSocket,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
|
||||
## Reads a line of data from ``socket``. Returned future will complete once
|
||||
@@ -325,52 +405,9 @@ proc recvLine*(socket: AsyncSocket,
|
||||
if result.len == 0:
|
||||
result.add("\c\L")
|
||||
assert SocketFlag.Peek notin flags ## TODO:
|
||||
if socket.isBuffered:
|
||||
result = ""
|
||||
if socket.bufLen == 0:
|
||||
let res = await socket.readIntoBuf(flags)
|
||||
if res == 0:
|
||||
return
|
||||
|
||||
var lastR = false
|
||||
while true:
|
||||
if socket.currPos >= socket.bufLen:
|
||||
let res = await socket.readIntoBuf(flags)
|
||||
if res == 0:
|
||||
result = ""
|
||||
break
|
||||
|
||||
case socket.buffer[socket.currPos]
|
||||
of '\r':
|
||||
lastR = true
|
||||
addNLIfEmpty()
|
||||
of '\L':
|
||||
addNLIfEmpty()
|
||||
socket.currPos.inc()
|
||||
return
|
||||
else:
|
||||
if lastR:
|
||||
socket.currPos.inc()
|
||||
return
|
||||
else:
|
||||
result.add socket.buffer[socket.currPos]
|
||||
socket.currPos.inc()
|
||||
else:
|
||||
result = ""
|
||||
var c = ""
|
||||
while true:
|
||||
c = await recv(socket, 1, flags)
|
||||
if c.len == 0:
|
||||
return ""
|
||||
if c == "\r":
|
||||
c = await recv(socket, 1, flags) # Skip \L
|
||||
assert c == "\L"
|
||||
addNLIfEmpty()
|
||||
return
|
||||
elif c == "\L":
|
||||
addNLIfEmpty()
|
||||
return
|
||||
add(result.string, c)
|
||||
result = ""
|
||||
socket.recvLineInto(result, flags)
|
||||
|
||||
proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
|
||||
## Marks ``socket`` as accepting connections.
|
||||
|
||||
Reference in New Issue
Block a user