attempt to make asynchttpserver better; fixes #15925; [backport:1.0]

(cherry picked from commit 8778d4a6f3)
This commit is contained in:
Araq
2020-11-13 13:03:14 +01:00
committed by narimiran
parent a832fa65c4
commit b3a12b4e3f
7 changed files with 123 additions and 50 deletions

View File

@@ -127,6 +127,12 @@
- Removed deprecated symbols from `macros` module, deprecated as far back as `0.15`.
- Added `asyncdispatch.activeDescriptors` that returns the number of currently
active async event handles/file descriptors
- Added `asyncdispatch.maxDescriptors` that returns the maximum number of
active async event handles/file descriptors.
## Language changes
- In newruntime it is now allowed to assign discriminator field without restrictions as long as case object doesn't have custom destructor. Discriminator value doesn't have to be a constant either. If you have custom destructor for case object and you do want to freely assign discriminator fields, it is recommended to refactor object into 2 objects like this:

View File

@@ -1892,3 +1892,25 @@ proc waitFor*[T](fut: Future[T]): T =
poll()
fut.read
proc activeDescriptors*(): int {.inline.} =
## Returns the current number of active file descriptors for the current
## event loop. This is a cheap operation that does not involve a system call.
when defined(windows):
result = getGlobalDispatcher().handles.len
else:
result = getGlobalDispatcher().selector.count
when defined(posix):
import posix
proc maxDescriptors*(): int {.raises: OSError.} =
## Returns the maximum number of active file descriptors for the current
## process. This involves a system call.
when defined(windows):
result = 16_700_000
else:
var fdLim: RLimit
if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
raiseOSError(osLastError())
result = int(fdLim.rlim_cur) - 1

View File

@@ -7,28 +7,44 @@
# distribution, for details about the copyright.
#
## This module implements a high performance asynchronous HTTP server.
##
## This HTTP server has not been designed to be used in production, but
## for testing applications locally. Because of this, when deploying your
## application you should use a reverse proxy (for example nginx) instead of
## allowing users to connect directly to this server.
##
## Basic usage
## ===========
##
## This example will create an HTTP server on port 8080. The server will
## respond to all requests with a ``200 OK`` response code and "Hello World"
## as the response body.
##
## .. code-block::nim
## import asynchttpserver, asyncdispatch
##
## var server = newAsyncHttpServer()
## proc cb(req: Request) {.async.} =
## await req.respond(Http200, "Hello World")
##
## waitFor server.serve(Port(8080), cb)
##[ This module implements a high performance asynchronous HTTP server.
This HTTP server has not been designed to be used in production, but
for testing applications locally. Because of this, when deploying your
application in production you should use a reverse proxy (for example nginx)
instead of allowing users to connect directly to this server.
Basic usage
===========
This example will create an HTTP server on port 8080. The server will
respond to all requests with a ``200 OK`` response code and "Hello World"
as the response body.
.. code-block::nim
import asynchttpserver, asyncdispatch
proc main {.async.} =
var server = newAsyncHttpServer()
proc cb(req: Request) {.async.} =
#echo(req.reqMethod, " ", req.url)
#echo(req.headers)
let headers = {"Date": "Tue, 29 Apr 2014 23:40:08 GMT",
"Content-type": "text/plain; charset=utf-8"}
await req.respond(Http200, "Hello World", headers.newHttpHeaders())
server.listen Port(5555)
while true:
if server.shouldAcceptRequest(5):
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, cb)
else:
poll()
asyncCheck main()
runForever()
]##
import asyncnet, asyncdispatch, parseutils, uri, strutils
import httpcore
@@ -58,14 +74,12 @@ type
reuseAddr: bool
reusePort: bool
maxBody: int ## The maximum content-length that will be read for the body.
maxFDs: int
proc newAsyncHttpServer*(reuseAddr = true, reusePort = false,
maxBody = 8388608): AsyncHttpServer =
## Creates a new ``AsyncHttpServer`` instance.
new result
result.reuseAddr = reuseAddr
result.reusePort = reusePort
result.maxBody = maxBody
result = AsyncHttpServer(reuseAddr: reuseAddr, reusePort: reusePort, maxBody: maxBody)
proc addHeaders(msg: var string, headers: HttpHeaders) =
for k, v in headers:
@@ -279,7 +293,7 @@ proc processRequest(
request.client.close()
return false
proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string,
proc processClient*(server: AsyncHttpServer, client: AsyncSocket, address: string,
callback: proc (request: Request):
Future[void] {.closure, gcsafe.}) {.async.} =
var request = newFutureVar[Request]("asynchttpserver.processClient")
@@ -294,13 +308,8 @@ proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string
)
if not retry: break
proc serve*(server: AsyncHttpServer, port: Port,
callback: proc (request: Request): Future[void] {.closure, gcsafe.},
address = "") {.async.} =
## Starts the process of listening for incoming HTTP connections on the
## specified address and port.
##
## When a request is made by a client the specified callback will be called.
proc listen*(server: AsyncHttpServer; port: Port; address = "") =
server.maxFDs = maxDescriptors()
server.socket = newAsyncSocket()
if server.reuseAddr:
server.socket.setSockOpt(OptReuseAddr, true)
@@ -309,9 +318,38 @@ proc serve*(server: AsyncHttpServer, port: Port,
server.socket.bindAddr(port, address)
server.socket.listen()
proc shouldAcceptRequest*(server: AsyncHttpServer;
assumedDescriptorsPerRequest = 5): bool {.inline.} =
## Returns true if the process's current number of opened file
## descriptors is still within the maximum limit and so it's reasonable to
## accept yet another request.
result = assumedDescriptorsPerRequest < 0 or
(activeDescriptors() + assumedDescriptorsPerRequest < server.maxFDs)
proc acceptRequest*(server: AsyncHttpServer, port: Port,
callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} =
## Accepts a single request.
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, callback)
proc serve*(server: AsyncHttpServer, port: Port,
callback: proc (request: Request): Future[void] {.closure, gcsafe.},
address = "";
assumedDescriptorsPerRequest = 5) {.async.} =
## Starts the process of listening for incoming HTTP connections on the
## specified address and port.
##
## When a request is made by a client the specified callback will be called.
##
## If `flowControl` is true the server cares about the process's maximum
## file descriptor limit.
listen server, port, address
while true:
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, callback)
if shouldAcceptRequest(server, assumedDescriptorsPerRequest):
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, callback)
else:
poll()
#echo(f.isNil)
#echo(f.repr)
@@ -320,7 +358,7 @@ proc close*(server: AsyncHttpServer) =
server.socket.close()
when not defined(testing) and isMainModule:
proc main =
proc main {.async.} =
var server = newAsyncHttpServer()
proc cb(req: Request) {.async.} =
#echo(req.reqMethod, " ", req.url)
@@ -329,6 +367,13 @@ when not defined(testing) and isMainModule:
"Content-type": "text/plain; charset=utf-8"}
await req.respond(Http200, "Hello World", headers.newHttpHeaders())
asyncCheck server.serve(Port(5555), cb)
runForever()
main()
server.listen Port(5555)
while true:
if server.shouldAcceptRequest(5):
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, cb)
else:
poll()
asyncCheck main()
runForever()

View File

@@ -55,7 +55,7 @@ when hasThreadSupport:
maxFD: int
numFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
count*: int
Selector*[T] = ptr SelectorImpl[T]
else:
type
@@ -64,7 +64,7 @@ else:
maxFD: int
numFD: int
fds: seq[SelectorKey[T]]
count: int
count*: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object

View File

@@ -30,7 +30,7 @@ when defined(macosx) or defined(freebsd) or defined(dragonfly):
proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
newp: pointer, newplen: csize_t): cint
{.importc: "sysctl",header: """#include <sys/types.h>
#include <sys/sysctl.h>"""}
#include <sys/sysctl.h>""".}
elif defined(netbsd) or defined(openbsd):
# OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
# KERN_MAXFILES, because KERN_MAXFILES is always bigger,
@@ -39,7 +39,7 @@ elif defined(netbsd) or defined(openbsd):
proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
newp: pointer, newplen: csize_t): cint
{.importc: "sysctl",header: """#include <sys/param.h>
#include <sys/sysctl.h>"""}
#include <sys/sysctl.h>""".}
when hasThreadSupport:
type
@@ -48,7 +48,7 @@ when hasThreadSupport:
maxFD: int
changes: ptr SharedArray[KEvent]
fds: ptr SharedArray[SelectorKey[T]]
count: int
count*: int
changesLock: Lock
changesSize: int
changesLength: int
@@ -61,7 +61,7 @@ else:
maxFD: int
changes: seq[KEvent]
fds: seq[SelectorKey[T]]
count: int
count*: int
sock: cint
Selector*[T] = ref SelectorImpl[T]

View File

@@ -21,7 +21,7 @@ when hasThreadSupport:
pollcnt: int
fds: ptr SharedArray[SelectorKey[T]]
pollfds: ptr SharedArray[TPollFd]
count: int
count*: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
@@ -31,7 +31,7 @@ else:
pollcnt: int
fds: seq[SelectorKey[T]]
pollfds: seq[TPollFd]
count: int
count*: int
Selector*[T] = ref SelectorImpl[T]
type

View File

@@ -58,7 +58,7 @@ when hasThreadSupport:
eSet: FdSet
maxFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
count*: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
@@ -69,7 +69,7 @@ else:
eSet: FdSet
maxFD: int
fds: seq[SelectorKey[T]]
count: int
count*: int
Selector*[T] = ref SelectorImpl[T]
type