Many async optimisations.

* Selectors implementation will now attempt to immediately execute an IO
  operation instead of waiting for a ready notification.
* Removed recursion in asynchttpserver.
* Improved buffered implementation of recvLine in asyncnet.
* Optimised ``respond`` in asynchttpserver removing a possible "Delayed ACK"
  situation.
This commit is contained in:
Dominik Picheta
2014-07-12 22:51:06 +01:00
parent c260b22fbc
commit cf5c8a204e
4 changed files with 166 additions and 116 deletions

View File

@@ -552,7 +552,18 @@ when defined(windows) or defined(nimdoc):
initAll()
else:
import selectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
when defined(windows):
import winlean
const
EINTR = WSAEINPROGRESS
EINPROGRESS = WSAEINPROGRESS
EWOULDBLOCK = WSAEWOULDBLOCK
EAGAIN = EINPROGRESS
MSG_NOSIGNAL = 0
else:
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
type
TAsyncFD* = distinct cint
TCallback = proc (sock: TAsyncFD): bool {.closure,gcsafe.}
@@ -693,12 +704,12 @@ else:
proc cb(sock: TAsyncFD): bool =
result = true
let res = recv(sock.TSocketHandle, addr readBuffer[0], size,
let res = recv(sock.TSocketHandle, addr readBuffer[0], size.cint,
flags.cint)
#echo("recv cb res: ", res)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
@@ -708,8 +719,8 @@ else:
else:
readBuffer.setLen(res)
retFuture.complete(readBuffer)
addRead(socket, cb)
if not cb(socket):
addRead(socket, cb)
return retFuture
proc send*(socket: TAsyncFD, data: string): PFuture[void] =
@@ -721,7 +732,8 @@ else:
result = true
let netSize = data.len-written
var d = data.cstring
let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint)
let res = send(sock.TSocketHandle, addr d[written], netSize.cint,
MSG_NOSIGNAL)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
@@ -734,7 +746,8 @@ else:
result = false # We still have data to send.
else:
retFuture.complete()
addWrite(socket, cb)
if not cb(socket):
addWrite(socket, cb)
return retFuture
proc acceptAddr*(socket: TAsyncFD):
@@ -756,7 +769,8 @@ else:
else:
register(client.TAsyncFD)
retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD))
addRead(socket, cb)
if not cb(socket):
addRead(socket, cb)
return retFuture
proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =

View File

@@ -51,10 +51,15 @@ proc `==`*(protocol: tuple[orig: string, major, minor: int],
proc newAsyncHttpServer*(): PAsyncHttpServer =
new result
proc sendHeaders*(req: TRequest, headers: PStringTable) {.async.} =
## Sends the specified headers to the requesting client.
proc addHeaders(msg: var string, headers: PStringTable) =
for k, v in headers:
await req.client.send(k & ": " & v & "\c\L")
msg.add(k & ": " & v & "\c\L")
proc sendHeaders*(req: TRequest, headers: PStringTable): PFuture[void] =
## Sends the specified headers to the requesting client.
var msg = ""
addHeaders(msg, headers)
return req.client.send(msg)
proc respond*(req: TRequest, code: THttpCode,
content: string, headers: PStringTable = newStringTable()) {.async.} =
@@ -64,9 +69,9 @@ proc respond*(req: TRequest, code: THttpCode,
## This procedure will **not** close the client socket.
var customHeaders = headers
customHeaders["Content-Length"] = $content.len
await req.client.send("HTTP/1.1 " & $code & "\c\L")
await sendHeaders(req, headers)
await req.client.send("\c\L" & content)
var msg = "HTTP/1.1 " & $code & "\c\L"
msg.addHeaders(customHeaders)
await req.client.send(msg & "\c\L" & content)
proc newRequest(): TRequest =
result.headers = newStringTable(modeCaseInsensitive)
@@ -93,90 +98,91 @@ proc sendStatus(client: PAsyncSocket, status: string): PFuture[void] =
proc processClient(client: PAsyncSocket, address: string,
callback: proc (request: TRequest): PFuture[void]) {.async.} =
# GET /path HTTP/1.1
# Header: val
# \n
var request = newRequest()
request.hostname = address
assert client != nil
request.client = client
var runCallback = true
# First line - GET /path HTTP/1.1
let line = await client.recvLine() # TODO: Timeouts.
if line == "":
client.close()
return
let lineParts = line.split(' ')
if lineParts.len != 3:
request.respond(Http400, "Invalid request. Got: " & line)
runCallback = false
let reqMethod = lineParts[0]
let path = lineParts[1]
let protocol = lineParts[2]
# Headers
var i = 0
while true:
i = 0
let headerLine = await client.recvLine()
if headerLine == "":
client.close(); return
if headerLine == "\c\L": break
# TODO: Compiler crash
#let (key, value) = parseHeader(headerLine)
let kv = parseHeader(headerLine)
request.headers[kv.key] = kv.value
# GET /path HTTP/1.1
# Header: val
# \n
var request = newRequest()
request.hostname = address
assert client != nil
request.client = client
var runCallback = true
request.reqMethod = reqMethod
request.url = parseUrl(path)
try:
request.protocol = protocol.parseProtocol()
except EInvalidValue:
request.respond(Http400, "Invalid request protocol. Got: " & protocol)
runCallback = false
if reqMethod.normalize == "post":
# Check for Expect header
if request.headers.hasKey("Expect"):
if request.headers["Expect"].toLower == "100-continue":
await client.sendStatus("100 Continue")
else:
await client.sendStatus("417 Expectation Failed")
# Read the body
# - Check for Content-length header
if request.headers.hasKey("Content-Length"):
var contentLength = 0
if parseInt(request.headers["Content-Length"], contentLength) == 0:
await request.respond(Http400, "Bad Request. Invalid Content-Length.")
else:
request.body = await client.recv(contentLength)
assert request.body.len == contentLength
else:
await request.respond(Http400, "Bad Request. No Content-Length.")
# First line - GET /path HTTP/1.1
let line = await client.recvLine() # TODO: Timeouts.
if line == "":
client.close()
return
let lineParts = line.split(' ')
if lineParts.len != 3:
request.respond(Http400, "Invalid request. Got: " & line)
runCallback = false
case reqMethod.normalize
of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch":
if runCallback:
await callback(request)
else:
await request.respond(Http400, "Invalid request method. Got: " & reqMethod)
let reqMethod = lineParts[0]
let path = lineParts[1]
let protocol = lineParts[2]
# Persistent connections
if (request.protocol == HttpVer11 and
request.headers["connection"].normalize != "close") or
(request.protocol == HttpVer10 and
request.headers["connection"].normalize == "keep-alive"):
# In HTTP 1.1 we assume that connection is persistent. Unless connection
# header states otherwise.
# In HTTP 1.0 we assume that the connection should not be persistent.
# Unless the connection header states otherwise.
await processClient(client, address, callback)
else:
request.client.close()
# Headers
var i = 0
while true:
i = 0
let headerLine = await client.recvLine()
if headerLine == "":
client.close(); return
if headerLine == "\c\L": break
# TODO: Compiler crash
#let (key, value) = parseHeader(headerLine)
let kv = parseHeader(headerLine)
request.headers[kv.key] = kv.value
request.reqMethod = reqMethod
request.url = parseUrl(path)
try:
request.protocol = protocol.parseProtocol()
except EInvalidValue:
request.respond(Http400, "Invalid request protocol. Got: " & protocol)
runCallback = false
if reqMethod.normalize == "post":
# Check for Expect header
if request.headers.hasKey("Expect"):
if request.headers["Expect"].toLower == "100-continue":
await client.sendStatus("100 Continue")
else:
await client.sendStatus("417 Expectation Failed")
# Read the body
# - Check for Content-length header
if request.headers.hasKey("Content-Length"):
var contentLength = 0
if parseInt(request.headers["Content-Length"], contentLength) == 0:
await request.respond(Http400, "Bad Request. Invalid Content-Length.")
else:
request.body = await client.recv(contentLength)
assert request.body.len == contentLength
else:
await request.respond(Http400, "Bad Request. No Content-Length.")
runCallback = false
case reqMethod.normalize
of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch":
if runCallback:
await callback(request)
else:
await request.respond(Http400, "Invalid request method. Got: " & reqMethod)
# Persistent connections
if (request.protocol == HttpVer11 and
request.headers["connection"].normalize != "close") or
(request.protocol == HttpVer10 and
request.headers["connection"].normalize == "keep-alive"):
# In HTTP 1.1 we assume that connection is persistent. Unless connection
# header states otherwise.
# In HTTP 1.0 we assume that the connection should not be persistent.
# Unless the connection header states otherwise.
else:
request.client.close()
break
proc serve*(server: PAsyncHttpServer, port: TPort,
callback: proc (request: TRequest): PFuture[void],

View File

@@ -110,12 +110,10 @@ proc recv*(socket: PAsyncSocket, size: int,
if socket.currPos >= socket.bufLen:
if (flags and MSG_PEEK) == MSG_PEEK:
# We don't want to get another buffer if we're peeking.
result.setLen(read)
return
break
let res = await socket.readIntoBuf(flags and (not MSG_PEEK))
if res == 0:
result.setLen(read)
return
break
let chunk = min(socket.bufLen-socket.currPos, size-read)
copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
@@ -181,28 +179,60 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
## If the socket is disconnected in the middle of a line (before ``\r\L``
## is read) then line will be set to ``""``.
## The partial line **will be lost**.
template addNLIfEmpty(): stmt =
if result.len == 0:
result.add("\c\L")
result = ""
var c = ""
while true:
c = await recv(socket, 1)
if c.len == 0:
return ""
if c == "\r":
c = await recv(socket, 1, MSG_PEEK)
if c.len > 0 and c == "\L":
let dummy = await recv(socket, 1)
assert dummy == "\L"
addNLIfEmpty()
return
elif c == "\L":
addNLIfEmpty()
return
add(result.string, c)
if socket.isBuffered:
result = ""
if socket.bufLen == 0:
let res = await socket.readIntoBuf(0)
if res == 0:
return
var lastR = false
while true:
if socket.currPos >= socket.bufLen:
let res = await socket.readIntoBuf(0)
if res == 0:
result = ""
break
case socket.buffer[socket.currPos]
of '\r':
lastR = true
addNLIfEmpty()
of '\L':
addNLIfEmpty()
socket.currPos.inc()
return
else:
if lastR:
socket.currPos.inc()
return
else:
result.add socket.buffer[socket.currPos]
socket.currPos.inc()
else:
result = ""
var c = ""
while true:
c = await recv(socket, 1)
if c.len == 0:
return ""
if c == "\r":
c = await recv(socket, 1, MSG_PEEK)
if c.len > 0 and c == "\L":
let dummy = await recv(socket, 1)
assert dummy == "\L"
addNLIfEmpty()
return
elif c == "\L":
addNLIfEmpty()
return
add(result.string, c)
proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") =
## Binds ``address``:``port`` to the socket.

View File

@@ -163,7 +163,7 @@ elif defined(linux):
proc newSelector*(): PSelector =
new result
result.epollFD = epoll_create(64)
result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
#result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
result.fds = initTable[TSocketHandle, PSelectorKey]()
if result.epollFD < 0:
OSError(OSLastError())