From 8778d4a6f36fee5a4c31f6f8f355b8607f2a592c Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 13 Nov 2020 13:03:14 +0100 Subject: [PATCH 1/6] attempt to make asynchttpserver better; fixes #15925; [backport:1.0] --- changelog.md | 6 ++ lib/pure/asyncdispatch.nim | 22 ++++ lib/pure/asynchttpserver.nim | 125 +++++++++++++++------- lib/pure/ioselects/ioselectors_epoll.nim | 4 +- lib/pure/ioselects/ioselectors_kqueue.nim | 8 +- lib/pure/ioselects/ioselectors_poll.nim | 4 +- lib/pure/ioselects/ioselectors_select.nim | 4 +- 7 files changed, 123 insertions(+), 50 deletions(-) diff --git a/changelog.md b/changelog.md index 11930cf926..e50be6de3c 100644 --- a/changelog.md +++ b/changelog.md @@ -35,6 +35,12 @@ - `doAssertRaises` now correctly handles foreign exceptions. +- Added `asyncdispatch.activeDescriptors` that returns the number of currently + active async event handles/file descriptors +- Added `asyncdispatch.maxDescriptors` that returns the maximum number of + active async event handles/file descriptors. + + ## Language changes - `nimscript` now handles `except Exception as e` diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d8b274c340..0afc608645 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -1934,3 +1934,25 @@ proc waitFor*[T](fut: Future[T]): T = poll() fut.read + +proc activeDescriptors*(): int {.inline.} = + ## Returns the current number of active file descriptors for the current + ## event loop. This is a cheap operation that does not involve a system call. + when defined(windows): + result = getGlobalDispatcher().handles.len + else: + result = getGlobalDispatcher().selector.count + +when defined(posix): + import posix + +proc maxDescriptors*(): int {.raises: OSError.} = + ## Returns the maximum number of active file descriptors for the current + ## process. This involves a system call. + when defined(windows): + result = 16_700_000 + else: + var fdLim: RLimit + if getrlimit(RLIMIT_NOFILE, fdLim) < 0: + raiseOSError(osLastError()) + result = int(fdLim.rlim_cur) - 1 diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index ec7f2a0dee..74b7d17b3b 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -7,28 +7,44 @@ # distribution, for details about the copyright. # -## This module implements a high performance asynchronous HTTP server. -## -## This HTTP server has not been designed to be used in production, but -## for testing applications locally. Because of this, when deploying your -## application in production you should use a reverse proxy (for example nginx) -## instead of allowing users to connect directly to this server. -## -## Basic usage -## =========== -## -## This example will create an HTTP server on port 8080. The server will -## respond to all requests with a ``200 OK`` response code and "Hello World" -## as the response body. -## -## .. code-block::nim -## import asynchttpserver, asyncdispatch -## -## var server = newAsyncHttpServer() -## proc cb(req: Request) {.async.} = -## await req.respond(Http200, "Hello World") -## -## waitFor server.serve(Port(8080), cb) +##[ This module implements a high performance asynchronous HTTP server. + +This HTTP server has not been designed to be used in production, but +for testing applications locally. Because of this, when deploying your +application in production you should use a reverse proxy (for example nginx) +instead of allowing users to connect directly to this server. + +Basic usage +=========== + +This example will create an HTTP server on port 8080. The server will +respond to all requests with a ``200 OK`` response code and "Hello World" +as the response body. + +.. code-block::nim + import asynchttpserver, asyncdispatch + + proc main {.async.} = + var server = newAsyncHttpServer() + proc cb(req: Request) {.async.} = + #echo(req.reqMethod, " ", req.url) + #echo(req.headers) + let headers = {"Date": "Tue, 29 Apr 2014 23:40:08 GMT", + "Content-type": "text/plain; charset=utf-8"} + await req.respond(Http200, "Hello World", headers.newHttpHeaders()) + + server.listen Port(5555) + while true: + if server.shouldAcceptRequest(5): + var (address, client) = await server.socket.acceptAddr() + asyncCheck processClient(server, client, address, cb) + else: + poll() + + asyncCheck main() + runForever() + +]## import asyncnet, asyncdispatch, parseutils, uri, strutils import httpcore @@ -58,14 +74,12 @@ type reuseAddr: bool reusePort: bool maxBody: int ## The maximum content-length that will be read for the body. + maxFDs: int proc newAsyncHttpServer*(reuseAddr = true, reusePort = false, maxBody = 8388608): AsyncHttpServer = ## Creates a new ``AsyncHttpServer`` instance. - new result - result.reuseAddr = reuseAddr - result.reusePort = reusePort - result.maxBody = maxBody + result = AsyncHttpServer(reuseAddr: reuseAddr, reusePort: reusePort, maxBody: maxBody) proc addHeaders(msg: var string, headers: HttpHeaders) = for k, v in headers: @@ -279,7 +293,7 @@ proc processRequest( request.client.close() return false -proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string, +proc processClient*(server: AsyncHttpServer, client: AsyncSocket, address: string, callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} = var request = newFutureVar[Request]("asynchttpserver.processClient") @@ -294,13 +308,8 @@ proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string ) if not retry: break -proc serve*(server: AsyncHttpServer, port: Port, - callback: proc (request: Request): Future[void] {.closure, gcsafe.}, - address = "") {.async.} = - ## Starts the process of listening for incoming HTTP connections on the - ## specified address and port. - ## - ## When a request is made by a client the specified callback will be called. +proc listen*(server: AsyncHttpServer; port: Port; address = "") = + server.maxFDs = maxDescriptors() server.socket = newAsyncSocket() if server.reuseAddr: server.socket.setSockOpt(OptReuseAddr, true) @@ -309,9 +318,38 @@ proc serve*(server: AsyncHttpServer, port: Port, server.socket.bindAddr(port, address) server.socket.listen() +proc shouldAcceptRequest*(server: AsyncHttpServer; + assumedDescriptorsPerRequest = 5): bool {.inline.} = + ## Returns true if the process's current number of opened file + ## descriptors is still within the maximum limit and so it's reasonable to + ## accept yet another request. + result = assumedDescriptorsPerRequest < 0 or + (activeDescriptors() + assumedDescriptorsPerRequest < server.maxFDs) + +proc acceptRequest*(server: AsyncHttpServer, port: Port, + callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} = + ## Accepts a single request. + var (address, client) = await server.socket.acceptAddr() + asyncCheck processClient(server, client, address, callback) + +proc serve*(server: AsyncHttpServer, port: Port, + callback: proc (request: Request): Future[void] {.closure, gcsafe.}, + address = ""; + assumedDescriptorsPerRequest = 5) {.async.} = + ## Starts the process of listening for incoming HTTP connections on the + ## specified address and port. + ## + ## When a request is made by a client the specified callback will be called. + ## + ## If `flowControl` is true the server cares about the process's maximum + ## file descriptor limit. + listen server, port, address while true: - var (address, client) = await server.socket.acceptAddr() - asyncCheck processClient(server, client, address, callback) + if shouldAcceptRequest(server, assumedDescriptorsPerRequest): + var (address, client) = await server.socket.acceptAddr() + asyncCheck processClient(server, client, address, callback) + else: + poll() #echo(f.isNil) #echo(f.repr) @@ -320,7 +358,7 @@ proc close*(server: AsyncHttpServer) = server.socket.close() when not defined(testing) and isMainModule: - proc main = + proc main {.async.} = var server = newAsyncHttpServer() proc cb(req: Request) {.async.} = #echo(req.reqMethod, " ", req.url) @@ -329,6 +367,13 @@ when not defined(testing) and isMainModule: "Content-type": "text/plain; charset=utf-8"} await req.respond(Http200, "Hello World", headers.newHttpHeaders()) - asyncCheck server.serve(Port(5555), cb) - runForever() - main() + server.listen Port(5555) + while true: + if server.shouldAcceptRequest(5): + var (address, client) = await server.socket.acceptAddr() + asyncCheck processClient(server, client, address, cb) + else: + poll() + + asyncCheck main() + runForever() diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim index 3dcf547bdb..b62b4c2db6 100644 --- a/lib/pure/ioselects/ioselectors_epoll.nim +++ b/lib/pure/ioselects/ioselectors_epoll.nim @@ -55,7 +55,7 @@ when hasThreadSupport: maxFD: int numFD: int fds: ptr SharedArray[SelectorKey[T]] - count: int + count*: int Selector*[T] = ptr SelectorImpl[T] else: type @@ -64,7 +64,7 @@ else: maxFD: int numFD: int fds: seq[SelectorKey[T]] - count: int + count*: int Selector*[T] = ref SelectorImpl[T] type SelectEventImpl = object diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim index a65be9842f..68be969c7f 100644 --- a/lib/pure/ioselects/ioselectors_kqueue.nim +++ b/lib/pure/ioselects/ioselectors_kqueue.nim @@ -30,7 +30,7 @@ when defined(macosx) or defined(freebsd) or defined(dragonfly): proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, newp: pointer, newplen: csize_t): cint {.importc: "sysctl",header: """#include - #include """} + #include """.} elif defined(netbsd) or defined(openbsd): # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using # KERN_MAXFILES, because KERN_MAXFILES is always bigger, @@ -39,7 +39,7 @@ elif defined(netbsd) or defined(openbsd): proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, newp: pointer, newplen: csize_t): cint {.importc: "sysctl",header: """#include - #include """} + #include """.} when hasThreadSupport: type @@ -48,7 +48,7 @@ when hasThreadSupport: maxFD: int changes: ptr SharedArray[KEvent] fds: ptr SharedArray[SelectorKey[T]] - count: int + count*: int changesLock: Lock changesSize: int changesLength: int @@ -61,7 +61,7 @@ else: maxFD: int changes: seq[KEvent] fds: seq[SelectorKey[T]] - count: int + count*: int sock: cint Selector*[T] = ref SelectorImpl[T] diff --git a/lib/pure/ioselects/ioselectors_poll.nim b/lib/pure/ioselects/ioselectors_poll.nim index 1af2a46db5..00e2f3fe9d 100644 --- a/lib/pure/ioselects/ioselectors_poll.nim +++ b/lib/pure/ioselects/ioselectors_poll.nim @@ -21,7 +21,7 @@ when hasThreadSupport: pollcnt: int fds: ptr SharedArray[SelectorKey[T]] pollfds: ptr SharedArray[TPollFd] - count: int + count*: int lock: Lock Selector*[T] = ptr SelectorImpl[T] else: @@ -31,7 +31,7 @@ else: pollcnt: int fds: seq[SelectorKey[T]] pollfds: seq[TPollFd] - count: int + count*: int Selector*[T] = ref SelectorImpl[T] type diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim index eed64a34db..2fd9ac0ba6 100644 --- a/lib/pure/ioselects/ioselectors_select.nim +++ b/lib/pure/ioselects/ioselectors_select.nim @@ -58,7 +58,7 @@ when hasThreadSupport: eSet: FdSet maxFD: int fds: ptr SharedArray[SelectorKey[T]] - count: int + count*: int lock: Lock Selector*[T] = ptr SelectorImpl[T] else: @@ -69,7 +69,7 @@ else: eSet: FdSet maxFD: int fds: seq[SelectorKey[T]] - count: int + count*: int Selector*[T] = ref SelectorImpl[T] type From cb19dc53ca49a237770271a3559772cc545e747f Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 13 Nov 2020 13:07:48 +0100 Subject: [PATCH 2/6] better documentation --- lib/pure/asynchttpserver.nim | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index 74b7d17b3b..b69f3f9a42 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -293,7 +293,7 @@ proc processRequest( request.client.close() return false -proc processClient*(server: AsyncHttpServer, client: AsyncSocket, address: string, +proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string, callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} = var request = newFutureVar[Request]("asynchttpserver.processClient") @@ -309,6 +309,7 @@ proc processClient*(server: AsyncHttpServer, client: AsyncSocket, address: strin if not retry: break proc listen*(server: AsyncHttpServer; port: Port; address = "") = + ## Listen to the given port and address. server.maxFDs = maxDescriptors() server.socket = newAsyncSocket() if server.reuseAddr: @@ -328,7 +329,8 @@ proc shouldAcceptRequest*(server: AsyncHttpServer; proc acceptRequest*(server: AsyncHttpServer, port: Port, callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} = - ## Accepts a single request. + ## Accepts a single request. Write an explicit loop around this proc so that + ## errors can be handled properly. var (address, client) = await server.socket.acceptAddr() asyncCheck processClient(server, client, address, callback) @@ -341,8 +343,13 @@ proc serve*(server: AsyncHttpServer, port: Port, ## ## When a request is made by a client the specified callback will be called. ## - ## If `flowControl` is true the server cares about the process's maximum - ## file descriptor limit. + ## If `assumedDescriptorsPerRequest` is 0 or greater the server cares about + ## the process's maximum file descriptor limit. It then ensures that the + ## process still has the resources for `assumedDescriptorsPerRequest` + ## file descriptors before accepting a connection. + ## + ## You should prefer to call `acceptRequest` instead with a custom server + ## loop so that you're in control over the error handling and logging. listen server, port, address while true: if shouldAcceptRequest(server, assumedDescriptorsPerRequest): From 19d52033f885a686f39b3f1b52564246b0fade83 Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 13 Nov 2020 13:50:57 +0100 Subject: [PATCH 3/6] fixes 'nim doc' --- lib/pure/asyncdispatch.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 0afc608645..0bacf3bb55 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -1940,7 +1940,7 @@ proc activeDescriptors*(): int {.inline.} = ## event loop. This is a cheap operation that does not involve a system call. when defined(windows): result = getGlobalDispatcher().handles.len - else: + elif not defined(nimdoc): result = getGlobalDispatcher().selector.count when defined(posix): From fa7b12bcf9fe62d7ef797aae71e248a6c51578bb Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 13 Nov 2020 14:18:01 +0100 Subject: [PATCH 4/6] makes test green again --- tests/errmsgs/tgcsafety.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/errmsgs/tgcsafety.nim b/tests/errmsgs/tgcsafety.nim index e6a62204ec..721677e952 100644 --- a/tests/errmsgs/tgcsafety.nim +++ b/tests/errmsgs/tgcsafety.nim @@ -5,8 +5,8 @@ nimout: ''' type mismatch: got .}> but expected one of: proc serve(server: AsyncHttpServer; port: Port; - callback: proc (request: Request): Future[void] {.closure, gcsafe.}; - address = ""): owned(Future[void]) + callback: proc (request: Request): Future[void] {.closure, gcsafe.}; + address = ""; assumedDescriptorsPerRequest = 5): owned(Future[void]) first type mismatch at position: 3 required type for callback: proc (request: Request): Future[system.void]{.closure, gcsafe.} but expression 'cb' is of type: proc (req: Request): Future[system.void]{.locks: .} From 9f566881f15bed58276d3df4b5b2cb2289016396 Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 13 Nov 2020 17:03:07 +0100 Subject: [PATCH 5/6] ported to FreeRTOS --- lib/pure/asyncdispatch.nim | 22 ++++++++++++---------- lib/pure/asynchttpserver.nim | 11 ++++++++++- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 0bacf3bb55..90345676fe 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -1946,13 +1946,15 @@ proc activeDescriptors*(): int {.inline.} = when defined(posix): import posix -proc maxDescriptors*(): int {.raises: OSError.} = - ## Returns the maximum number of active file descriptors for the current - ## process. This involves a system call. - when defined(windows): - result = 16_700_000 - else: - var fdLim: RLimit - if getrlimit(RLIMIT_NOFILE, fdLim) < 0: - raiseOSError(osLastError()) - result = int(fdLim.rlim_cur) - 1 +when defined(linux) or defined(windows) or defined(macosx) or defined(bsd): + proc maxDescriptors*(): int {.raises: OSError.} = + ## Returns the maximum number of active file descriptors for the current + ## process. This involves a system call. For now `maxDescriptors` is + ## supported on the following OSes: Windows, Linux, OSX, BSD. + when defined(windows): + result = 16_700_000 + else: + var fdLim: RLimit + if getrlimit(RLIMIT_NOFILE, fdLim) < 0: + raiseOSError(osLastError()) + result = int(fdLim.rlim_cur) - 1 diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index b69f3f9a42..a5f6a0bc85 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -308,9 +308,18 @@ proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string ) if not retry: break +const + nimMaxDescriptorsFallback* {.intdefine.} = 16_000 ## fallback value for \ + ## when `maxDescriptors` is not available. + ## This can be set on the command line during compilation + ## via `-d:nimMaxDescriptorsFallback=N` + proc listen*(server: AsyncHttpServer; port: Port; address = "") = ## Listen to the given port and address. - server.maxFDs = maxDescriptors() + when declared(maxDescriptors): + server.maxFDs = try: maxDescriptors() except: nimMaxDescriptorsFallback + else: + server.maxFDs = nimMaxDescriptorsFallback server.socket = newAsyncSocket() if server.reuseAddr: server.socket.setSockOpt(OptReuseAddr, true) From 02f8b11a716843be8270f8bf7b2dac94f1c7dc1a Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 13 Nov 2020 17:16:00 +0100 Subject: [PATCH 6/6] fixes the doc rendering --- lib/pure/asynchttpserver.nim | 73 +++++++++++++++++------------------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index a5f6a0bc85..f3d4b3dd26 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -7,44 +7,41 @@ # distribution, for details about the copyright. # -##[ This module implements a high performance asynchronous HTTP server. - -This HTTP server has not been designed to be used in production, but -for testing applications locally. Because of this, when deploying your -application in production you should use a reverse proxy (for example nginx) -instead of allowing users to connect directly to this server. - -Basic usage -=========== - -This example will create an HTTP server on port 8080. The server will -respond to all requests with a ``200 OK`` response code and "Hello World" -as the response body. - -.. code-block::nim - import asynchttpserver, asyncdispatch - - proc main {.async.} = - var server = newAsyncHttpServer() - proc cb(req: Request) {.async.} = - #echo(req.reqMethod, " ", req.url) - #echo(req.headers) - let headers = {"Date": "Tue, 29 Apr 2014 23:40:08 GMT", - "Content-type": "text/plain; charset=utf-8"} - await req.respond(Http200, "Hello World", headers.newHttpHeaders()) - - server.listen Port(5555) - while true: - if server.shouldAcceptRequest(5): - var (address, client) = await server.socket.acceptAddr() - asyncCheck processClient(server, client, address, cb) - else: - poll() - - asyncCheck main() - runForever() - -]## +## This module implements a high performance asynchronous HTTP server. +## +## This HTTP server has not been designed to be used in production, but +## for testing applications locally. Because of this, when deploying your +## application in production you should use a reverse proxy (for example nginx) +## instead of allowing users to connect directly to this server. +## +## Basic usage +## =========== +## +## This example will create an HTTP server on port 8080. The server will +## respond to all requests with a ``200 OK`` response code and "Hello World" +## as the response body. +## +## .. code-block:: Nim +## +## import asynchttpserver, asyncdispatch +## +## proc main {.async.} = +## var server = newAsyncHttpServer() +## proc cb(req: Request) {.async.} = +## let headers = {"Date": "Tue, 29 Apr 2014 23:40:08 GMT", +## "Content-type": "text/plain; charset=utf-8"} +## await req.respond(Http200, "Hello World", headers.newHttpHeaders()) +## +## server.listen Port(5555) +## while true: +## if server.shouldAcceptRequest(5): +## var (address, client) = await server.socket.acceptAddr() +## asyncCheck processClient(server, client, address, cb) +## else: +## poll() +## +## asyncCheck main() +## runForever() import asyncnet, asyncdispatch, parseutils, uri, strutils import httpcore