mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-29 09:24:36 +00:00
Fixes #15003. This is a serious bug which occurs when data cannot be read/sent immediately and there are a bunch of other read/write events pending. What happens is that the new events are dropped which results in the case of the reported bug resulted in some data not being sent (!).
1935 lines
71 KiB
Nim
1935 lines
71 KiB
Nim
#
|
|
#
|
|
# Nim's Runtime Library
|
|
# (c) Copyright 2015 Dominik Picheta
|
|
#
|
|
# See the file "copying.txt", included in this
|
|
# distribution, for details about the copyright.
|
|
#
|
|
|
|
## This module implements asynchronous IO. This includes a dispatcher,
|
|
## a ``Future`` type implementation, and an ``async`` macro which allows
|
|
## asynchronous code to be written in a synchronous style with the ``await``
|
|
## keyword.
|
|
##
|
|
## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
|
|
## (or a function which does so for you such as ``waitFor`` or ``runForever``)
|
|
## in order to poll for any outstanding events. The underlying implementation
|
|
## is based on epoll on Linux, IO Completion Ports on Windows and select on
|
|
## other operating systems.
|
|
##
|
|
## The ``poll`` function will not, on its own, return any events. Instead
|
|
## an appropriate ``Future`` object will be completed. A ``Future`` is a
|
|
## type which holds a value which is not yet available, but which *may* be
|
|
## available in the future. You can check whether a future is finished
|
|
## by using the ``finished`` function. When a future is finished it means that
|
|
## either the value that it holds is now available or it holds an error instead.
|
|
## The latter situation occurs when the operation to complete a future fails
|
|
## with an exception. You can distinguish between the two situations with the
|
|
## ``failed`` function.
|
|
##
|
|
## Future objects can also store a callback procedure which will be called
|
|
## automatically once the future completes.
|
|
##
|
|
## Futures therefore can be thought of as an implementation of the proactor
|
|
## pattern. In this
|
|
## pattern you make a request for an action, and once that action is fulfilled
|
|
## a future is completed with the result of that action. Requests can be
|
|
## made by calling the appropriate functions. For example: calling the ``recv``
|
|
## function will create a request for some data to be read from a socket. The
|
|
## future which the ``recv`` function returns will then complete once the
|
|
## requested amount of data is read **or** an exception occurs.
|
|
##
|
|
## Code to read some data from a socket may look something like this:
|
|
##
|
|
## .. code-block::nim
|
|
## var future = socket.recv(100)
|
|
## future.addCallback(
|
|
## proc () =
|
|
## echo(future.read)
|
|
## )
|
|
##
|
|
## All asynchronous functions returning a ``Future`` will not block. They
|
|
## will not however return immediately. An asynchronous function will have
|
|
## code which will be executed before an asynchronous request is made, in most
|
|
## cases this code sets up the request.
|
|
##
|
|
## In the above example, the ``recv`` function will return a brand new
|
|
## ``Future`` instance once the request for data to be read from the socket
|
|
## is made. This ``Future`` instance will complete once the requested amount
|
|
## of data is read, in this case it is 100 bytes. The second line sets a
|
|
## callback on this future which will be called once the future completes.
|
|
## All the callback does is write the data stored in the future to ``stdout``.
|
|
## The ``read`` function is used for this and it checks whether the future
|
|
## completes with an error for you (if it did it will simply raise the
|
|
## error), if there is no error however it returns the value of the future.
|
|
##
|
|
## Asynchronous procedures
|
|
## =======================
|
|
##
|
|
## Asynchronous procedures remove the pain of working with callbacks. They do
|
|
## this by allowing you to write asynchronous code the same way as you would
|
|
## write synchronous code.
|
|
##
|
|
## An asynchronous procedure is marked using the ``{.async.}`` pragma.
|
|
## When marking a procedure with the ``{.async.}`` pragma it must have a
|
|
## ``Future[T]`` return type or no return type at all. If you do not specify
|
|
## a return type then ``Future[void]`` is assumed.
|
|
##
|
|
## Inside asynchronous procedures ``await`` can be used to call any
|
|
## procedures which return a
|
|
## ``Future``; this includes asynchronous procedures. When a procedure is
|
|
## "awaited", the asynchronous procedure it is awaited in will
|
|
## suspend its execution
|
|
## until the awaited procedure's Future completes. At which point the
|
|
## asynchronous procedure will resume its execution. During the period
|
|
## when an asynchronous procedure is suspended other asynchronous procedures
|
|
## will be run by the dispatcher.
|
|
##
|
|
## The ``await`` call may be used in many contexts. It can be used on the right
|
|
## hand side of a variable declaration: ``var data = await socket.recv(100)``,
|
|
## in which case the variable will be set to the value of the future
|
|
## automatically. It can be used to await a ``Future`` object, and it can
|
|
## be used to await a procedure returning a ``Future[void]``:
|
|
## ``await socket.send("foobar")``.
|
|
##
|
|
## If an awaited future completes with an error, then ``await`` will re-raise
|
|
## this error. To avoid this, you can use the ``yield`` keyword instead of
|
|
## ``await``. The following section shows different ways that you can handle
|
|
## exceptions in async procs.
|
|
##
|
|
## Handling Exceptions
|
|
## -------------------
|
|
##
|
|
## The most reliable way to handle exceptions is to use ``yield`` on a future
|
|
## then check the future's ``failed`` property. For example:
|
|
##
|
|
## .. code-block:: Nim
|
|
## var future = sock.recv(100)
|
|
## yield future
|
|
## if future.failed:
|
|
## # Handle exception
|
|
##
|
|
## The ``async`` procedures also offer limited support for the try statement.
|
|
##
|
|
## .. code-block:: Nim
|
|
## try:
|
|
## let data = await sock.recv(100)
|
|
## echo("Received ", data)
|
|
## except:
|
|
## # Handle exception
|
|
##
|
|
## Unfortunately the semantics of the try statement may not always be correct,
|
|
## and occasionally the compilation may fail altogether.
|
|
## As such it is better to use the former style when possible.
|
|
##
|
|
##
|
|
## Discarding futures
|
|
## ==================
|
|
##
|
|
## Futures should **never** be discarded. This is because they may contain
|
|
## errors. If you do not care for the result of a Future then you should
|
|
## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. Note
|
|
## however that this does not wait for completion, and you should use
|
|
## ``waitFor`` for that purpose.
|
|
##
|
|
## Examples
|
|
## ========
|
|
##
|
|
## For examples take a look at the documentation for the modules implementing
|
|
## asynchronous IO. A good place to start is the
|
|
## `asyncnet module <asyncnet.html>`_.
|
|
##
|
|
## Investigating pending futures
|
|
## =============================
|
|
##
|
|
## It's possible to get into a situation where an async proc, or more accurately
|
|
## a ``Future[T]`` gets stuck and
|
|
## never completes. This can happen for various reasons and can cause serious
|
|
## memory leaks. When this occurs it's hard to identify the procedure that is
|
|
## stuck.
|
|
##
|
|
## Thankfully there is a mechanism which tracks the count of each pending future.
|
|
## All you need to do to enable it is compile with ``-d:futureLogging`` and
|
|
## use the ``getFuturesInProgress`` procedure to get the list of pending futures
|
|
## together with the stack traces to the moment of their creation.
|
|
##
|
|
## You may also find it useful to use this
|
|
## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
|
|
## the pending futures into prometheus, allowing you to analyse them via a nice
|
|
## graph.
|
|
##
|
|
##
|
|
##
|
|
## Limitations/Bugs
|
|
## ================
|
|
##
|
|
## * The effect system (``raises: []``) does not work with async procedures.
|
|
|
|
import os, tables, strutils, times, heapqueue, options, asyncstreams
|
|
import options, math, std/monotimes
|
|
import asyncfutures except callSoon
|
|
|
|
import nativesockets, net, deques
|
|
|
|
export Port, SocketFlag
|
|
export asyncfutures except callSoon
|
|
export asyncstreams
|
|
|
|
#{.injectStmt: newGcInvariant().}
|
|
|
|
# TODO: Check if yielded future is nil and throw a more meaningful exception
|
|
|
|
type
|
|
PDispatcherBase = ref object of RootRef
|
|
timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
|
|
callbacks*: Deque[proc () {.gcsafe.}]
|
|
|
|
proc processTimers(
|
|
p: PDispatcherBase, didSomeWork: var bool
|
|
): Option[int] {.inline.} =
|
|
# Pop the timers in the order in which they will expire (smaller `finishAt`).
|
|
var count = p.timers.len
|
|
let t = getMonoTime()
|
|
while count > 0 and t >= p.timers[0].finishAt:
|
|
p.timers.pop().fut.complete()
|
|
dec count
|
|
didSomeWork = true
|
|
|
|
# Return the number of milliseconds in which the next timer will expire.
|
|
if p.timers.len == 0: return
|
|
|
|
let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
|
|
return some(millisecs.int + 1)
|
|
|
|
proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
|
|
while p.callbacks.len > 0:
|
|
var cb = p.callbacks.popFirst()
|
|
cb()
|
|
didSomeWork = true
|
|
|
|
proc adjustTimeout(
|
|
p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
|
|
): int {.inline.} =
|
|
if p.callbacks.len != 0:
|
|
return 0
|
|
|
|
if nextTimer.isNone() or pollTimeout == -1:
|
|
return pollTimeout
|
|
|
|
result = max(nextTimer.get(), 0)
|
|
result = min(pollTimeout, result)
|
|
|
|
proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
|
|
## Schedule `cbproc` to be called as soon as possible.
|
|
## The callback is called when control returns to the event loop.
|
|
|
|
proc initCallSoonProc =
|
|
if asyncfutures.getCallSoonProc().isNil:
|
|
asyncfutures.setCallSoonProc(callSoon)
|
|
|
|
template implementSetInheritable() {.dirty.} =
|
|
when declared(setInheritable):
|
|
proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
|
|
## Control whether a file handle can be inherited by child processes.
|
|
## Returns ``true`` on success.
|
|
##
|
|
## This procedure is not guaranteed to be available for all platforms.
|
|
## Test for availability with `declared()`_.
|
|
fd.FileHandle.setInheritable(inheritable)
|
|
|
|
when defined(windows) or defined(nimdoc):
|
|
import winlean, sets, hashes
|
|
type
|
|
CompletionKey = ULONG_PTR
|
|
|
|
CompletionData* = object
|
|
fd*: AsyncFD # TODO: Rename this.
|
|
cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
|
|
errcode: OSErrorCode) {.closure, gcsafe.})
|
|
cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
|
|
# when using RegisterWaitForSingleObject, because
|
|
# waiting is done in different thread.
|
|
|
|
PDispatcher* = ref object of PDispatcherBase
|
|
ioPort: Handle
|
|
handles: HashSet[AsyncFD]
|
|
|
|
CustomObj = object of OVERLAPPED
|
|
data*: CompletionData
|
|
|
|
CustomRef* = ref CustomObj
|
|
|
|
AsyncFD* = distinct int
|
|
|
|
PostCallbackData = object
|
|
ioPort: Handle
|
|
handleFd: AsyncFD
|
|
waitFd: Handle
|
|
ovl: owned CustomRef
|
|
PostCallbackDataPtr = ptr PostCallbackData
|
|
|
|
AsyncEventImpl = object
|
|
hEvent: Handle
|
|
hWaiter: Handle
|
|
pcd: PostCallbackDataPtr
|
|
AsyncEvent* = ptr AsyncEventImpl
|
|
|
|
Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
|
|
|
|
proc hash(x: AsyncFD): Hash {.borrow.}
|
|
proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
|
|
|
|
proc newDispatcher*(): owned PDispatcher =
|
|
## Creates a new Dispatcher instance.
|
|
new result
|
|
result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
|
|
result.handles = initHashSet[AsyncFD]()
|
|
result.timers.clear()
|
|
result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
|
|
|
|
var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
|
|
|
|
proc setGlobalDispatcher*(disp: owned PDispatcher) =
|
|
if not gDisp.isNil:
|
|
assert gDisp.callbacks.len == 0
|
|
gDisp = disp
|
|
initCallSoonProc()
|
|
|
|
proc getGlobalDispatcher*(): PDispatcher =
|
|
if gDisp.isNil:
|
|
setGlobalDispatcher(newDispatcher())
|
|
result = gDisp
|
|
|
|
proc getIoHandler*(disp: PDispatcher): Handle =
|
|
## Returns the underlying IO Completion Port handle (Windows) or selector
|
|
## (Unix) for the specified dispatcher.
|
|
return disp.ioPort
|
|
|
|
proc register*(fd: AsyncFD) =
|
|
## Registers ``fd`` with the dispatcher.
|
|
let p = getGlobalDispatcher()
|
|
|
|
if createIoCompletionPort(fd.Handle, p.ioPort,
|
|
cast[CompletionKey](fd), 1) == 0:
|
|
raiseOSError(osLastError())
|
|
p.handles.incl(fd)
|
|
|
|
proc verifyPresence(fd: AsyncFD) =
|
|
## Ensures that file descriptor has been registered with the dispatcher.
|
|
## Raises ValueError if `fd` has not been registered.
|
|
let p = getGlobalDispatcher()
|
|
if fd notin p.handles:
|
|
raise newException(ValueError,
|
|
"Operation performed on a socket which has not been registered with" &
|
|
" the dispatcher yet.")
|
|
|
|
proc hasPendingOperations*(): bool =
|
|
## Returns `true` if the global dispatcher has pending operations.
|
|
let p = getGlobalDispatcher()
|
|
p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
|
|
|
|
proc runOnce(timeout = 500): bool =
|
|
let p = getGlobalDispatcher()
|
|
if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
|
|
raise newException(ValueError,
|
|
"No handles or timers registered in dispatcher.")
|
|
|
|
result = false
|
|
let nextTimer = processTimers(p, result)
|
|
let at = adjustTimeout(p, timeout, nextTimer)
|
|
var llTimeout =
|
|
if at == -1: winlean.INFINITE
|
|
else: at.int32
|
|
|
|
var lpNumberOfBytesTransferred: DWORD
|
|
var lpCompletionKey: ULONG_PTR
|
|
var customOverlapped: CustomRef
|
|
let res = getQueuedCompletionStatus(p.ioPort,
|
|
addr lpNumberOfBytesTransferred, addr lpCompletionKey,
|
|
cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
|
|
result = true
|
|
# For 'gcDestructors' the destructor of 'customOverlapped' will
|
|
# be called at the end and we are the only owner here. This means
|
|
# We do not have to 'GC_unref(customOverlapped)' because the destructor
|
|
# does that for us.
|
|
|
|
# http://stackoverflow.com/a/12277264/492186
|
|
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
|
|
if res:
|
|
# This is useful for ensuring the reliability of the overlapped struct.
|
|
assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
|
|
|
|
customOverlapped.data.cb(customOverlapped.data.fd,
|
|
lpNumberOfBytesTransferred, OSErrorCode(-1))
|
|
|
|
# If cell.data != nil, then system.protect(rawEnv(cb)) was called,
|
|
# so we need to dispose our `cb` environment, because it is not needed
|
|
# anymore.
|
|
if customOverlapped.data.cell.data != nil:
|
|
system.dispose(customOverlapped.data.cell)
|
|
|
|
when not defined(gcDestructors):
|
|
GC_unref(customOverlapped)
|
|
else:
|
|
let errCode = osLastError()
|
|
if customOverlapped != nil:
|
|
assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
|
|
customOverlapped.data.cb(customOverlapped.data.fd,
|
|
lpNumberOfBytesTransferred, errCode)
|
|
if customOverlapped.data.cell.data != nil:
|
|
system.dispose(customOverlapped.data.cell)
|
|
when not defined(gcDestructors):
|
|
GC_unref(customOverlapped)
|
|
else:
|
|
if errCode.int32 == WAIT_TIMEOUT:
|
|
# Timed out
|
|
result = false
|
|
else: raiseOSError(errCode)
|
|
|
|
# Timer processing.
|
|
discard processTimers(p, result)
|
|
# Callback queue processing
|
|
processPendingCallbacks(p, result)
|
|
|
|
|
|
var acceptEx: WSAPROC_ACCEPTEX
|
|
var connectEx: WSAPROC_CONNECTEX
|
|
var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
|
|
|
|
proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
|
|
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
|
|
var bytesRet: DWORD
|
|
fun = nil
|
|
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
|
|
sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
|
|
addr bytesRet, nil, nil) == 0
|
|
|
|
proc initAll() =
|
|
let dummySock = createNativeSocket()
|
|
if dummySock == INVALID_SOCKET:
|
|
raiseOSError(osLastError())
|
|
var fun: pointer = nil
|
|
if not initPointer(dummySock, fun, WSAID_CONNECTEX):
|
|
raiseOSError(osLastError())
|
|
connectEx = cast[WSAPROC_CONNECTEX](fun)
|
|
if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
|
|
raiseOSError(osLastError())
|
|
acceptEx = cast[WSAPROC_ACCEPTEX](fun)
|
|
if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
|
|
raiseOSError(osLastError())
|
|
getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
|
|
close(dummySock)
|
|
|
|
proc newCustom*(): CustomRef =
|
|
result = CustomRef() # 0
|
|
GC_ref(result) # 1 prevent destructor from doing a premature free.
|
|
# destructor of newCustom's caller --> 0. This means
|
|
# Windows holds a ref for us with RC == 0 (single owner).
|
|
# This is passed back to us in the IO completion port.
|
|
|
|
proc recv*(socket: AsyncFD, size: int,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
|
|
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
|
|
## complete once all the data requested is read, a part of the data has been
|
|
## read, or the socket has disconnected in which case the future will
|
|
## complete with a value of ``""``.
|
|
##
|
|
## **Warning**: The ``Peek`` socket flag is not supported on Windows.
|
|
|
|
|
|
# Things to note:
|
|
# * When WSARecv completes immediately then ``bytesReceived`` is very
|
|
# unreliable.
|
|
# * Still need to implement message-oriented socket disconnection,
|
|
# '\0' in the message currently signifies a socket disconnect. Who
|
|
# knows what will happen when someone sends that to our socket.
|
|
verifyPresence(socket)
|
|
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
|
|
|
|
var retFuture = newFuture[string]("recv")
|
|
var dataBuf: TWSABuf
|
|
dataBuf.buf = cast[cstring](alloc0(size))
|
|
dataBuf.len = size.ULONG
|
|
|
|
var bytesReceived: DWORD
|
|
var flagsio = flags.toOSFlags().DWORD
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
if bytesCount == 0 and dataBuf.buf[0] == '\0':
|
|
retFuture.complete("")
|
|
else:
|
|
var data = newString(bytesCount)
|
|
assert bytesCount <= size
|
|
copyMem(addr data[0], addr dataBuf.buf[0], bytesCount)
|
|
retFuture.complete($data)
|
|
else:
|
|
if flags.isDisconnectionError(errcode):
|
|
retFuture.complete("")
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
|
if dataBuf.buf != nil:
|
|
dealloc dataBuf.buf
|
|
dataBuf.buf = nil
|
|
)
|
|
|
|
let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
|
|
addr flagsio, cast[POVERLAPPED](ol), nil)
|
|
if ret == -1:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
if dataBuf.buf != nil:
|
|
dealloc dataBuf.buf
|
|
dataBuf.buf = nil
|
|
GC_unref(ol)
|
|
if flags.isDisconnectionError(err):
|
|
retFuture.complete("")
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
elif ret == 0:
|
|
# Request completed immediately.
|
|
if bytesReceived != 0:
|
|
var data = newString(bytesReceived)
|
|
assert bytesReceived <= size
|
|
copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
|
|
retFuture.complete($data)
|
|
else:
|
|
if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
|
|
retFuture.complete("")
|
|
return retFuture
|
|
|
|
proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
|
|
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
|
|
## at least be of that size. Returned future will complete once all the
|
|
## data requested is read, a part of the data has been read, or the socket
|
|
## has disconnected in which case the future will complete with a value of
|
|
## ``0``.
|
|
##
|
|
## **Warning**: The ``Peek`` socket flag is not supported on Windows.
|
|
|
|
|
|
# Things to note:
|
|
# * When WSARecv completes immediately then ``bytesReceived`` is very
|
|
# unreliable.
|
|
# * Still need to implement message-oriented socket disconnection,
|
|
# '\0' in the message currently signifies a socket disconnect. Who
|
|
# knows what will happen when someone sends that to our socket.
|
|
verifyPresence(socket)
|
|
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
|
|
|
|
var retFuture = newFuture[int]("recvInto")
|
|
|
|
#buf[] = '\0'
|
|
var dataBuf: TWSABuf
|
|
dataBuf.buf = cast[cstring](buf)
|
|
dataBuf.len = size.ULONG
|
|
|
|
var bytesReceived: DWORD
|
|
var flagsio = flags.toOSFlags().DWORD
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
retFuture.complete(bytesCount)
|
|
else:
|
|
if flags.isDisconnectionError(errcode):
|
|
retFuture.complete(0)
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
|
if dataBuf.buf != nil:
|
|
dataBuf.buf = nil
|
|
)
|
|
|
|
let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
|
|
addr flagsio, cast[POVERLAPPED](ol), nil)
|
|
if ret == -1:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
if dataBuf.buf != nil:
|
|
dataBuf.buf = nil
|
|
GC_unref(ol)
|
|
if flags.isDisconnectionError(err):
|
|
retFuture.complete(0)
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
elif ret == 0:
|
|
# Request completed immediately.
|
|
if bytesReceived != 0:
|
|
assert bytesReceived <= size
|
|
retFuture.complete(bytesReceived)
|
|
else:
|
|
if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
|
|
retFuture.complete(bytesReceived)
|
|
return retFuture
|
|
|
|
proc send*(socket: AsyncFD, buf: pointer, size: int,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
|
|
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
|
|
## will complete once all data has been sent.
|
|
##
|
|
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
|
|
## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
|
|
verifyPresence(socket)
|
|
var retFuture = newFuture[void]("send")
|
|
|
|
var dataBuf: TWSABuf
|
|
dataBuf.buf = cast[cstring](buf)
|
|
dataBuf.len = size.ULONG
|
|
|
|
var bytesReceived, lowFlags: DWORD
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
retFuture.complete()
|
|
else:
|
|
if flags.isDisconnectionError(errcode):
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.fail(newOSError(errcode))
|
|
)
|
|
|
|
let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
|
|
lowFlags, cast[POVERLAPPED](ol), nil)
|
|
if ret == -1:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
GC_unref(ol)
|
|
if flags.isDisconnectionError(err):
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
else:
|
|
retFuture.complete()
|
|
# We don't deallocate ``ol`` here because even though this completed
|
|
# immediately poll will still be notified about its completion and it will
|
|
# free ``ol``.
|
|
return retFuture
|
|
|
|
proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
|
|
saddrLen: SockLen,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
|
|
## Sends ``data`` to specified destination ``saddr``, using
|
|
## socket ``socket``. The returned future will complete once all data
|
|
## has been sent.
|
|
verifyPresence(socket)
|
|
var retFuture = newFuture[void]("sendTo")
|
|
var dataBuf: TWSABuf
|
|
dataBuf.buf = cast[cstring](data)
|
|
dataBuf.len = size.ULONG
|
|
var bytesSent = 0.DWORD
|
|
var lowFlags = 0.DWORD
|
|
|
|
# we will preserve address in our stack
|
|
var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
|
|
var stalen: cint = cint(saddrLen)
|
|
zeroMem(addr(staddr[0]), 128)
|
|
copyMem(addr(staddr[0]), saddr, saddrLen)
|
|
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
|
)
|
|
|
|
let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
|
|
lowFlags, cast[ptr SockAddr](addr(staddr[0])),
|
|
stalen, cast[POVERLAPPED](ol), nil)
|
|
if ret == -1:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
GC_unref(ol)
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
else:
|
|
retFuture.complete()
|
|
# We don't deallocate ``ol`` here because even though this completed
|
|
# immediately poll will still be notified about its completion and it will
|
|
# free ``ol``.
|
|
return retFuture
|
|
|
|
proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
|
|
saddr: ptr SockAddr, saddrLen: ptr SockLen,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
|
|
## Receives a datagram data from ``socket`` into ``buf``, which must
|
|
## be at least of size ``size``, address of datagram's sender will be
|
|
## stored into ``saddr`` and ``saddrLen``. Returned future will complete
|
|
## once one datagram has been received, and will return size of packet
|
|
## received.
|
|
verifyPresence(socket)
|
|
var retFuture = newFuture[int]("recvFromInto")
|
|
|
|
var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
|
|
|
|
var bytesReceived = 0.DWORD
|
|
var lowFlags = 0.DWORD
|
|
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
assert bytesCount <= size
|
|
retFuture.complete(bytesCount)
|
|
else:
|
|
# datagram sockets don't have disconnection,
|
|
# so we can just raise an exception
|
|
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
|
)
|
|
|
|
let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
|
|
addr bytesReceived, addr lowFlags,
|
|
saddr, cast[ptr cint](saddrLen),
|
|
cast[POVERLAPPED](ol), nil)
|
|
if res == -1:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
GC_unref(ol)
|
|
retFuture.fail(newException(OSError, osErrorMsg(err)))
|
|
else:
|
|
# Request completed immediately.
|
|
if bytesReceived != 0:
|
|
assert bytesReceived <= size
|
|
retFuture.complete(bytesReceived)
|
|
else:
|
|
if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
|
|
retFuture.complete(bytesReceived)
|
|
return retFuture
|
|
|
|
proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
|
|
inheritable = defined(nimInheritHandles)):
|
|
owned(Future[tuple[address: string, client: AsyncFD]]) =
|
|
## Accepts a new connection. Returns a future containing the client socket
|
|
## corresponding to that connection and the remote address of the client.
|
|
## The future will complete when the connection is successfully accepted.
|
|
##
|
|
## The resulting client socket is automatically registered to the
|
|
## dispatcher.
|
|
##
|
|
## If ``inheritable`` is false (the default), the resulting client socket will
|
|
## not be inheritable by child processes.
|
|
##
|
|
## The ``accept`` call may result in an error if the connecting socket
|
|
## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
|
|
## flag is specified then this error will not be raised and instead
|
|
## accept will be called again.
|
|
verifyPresence(socket)
|
|
var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
|
|
|
|
var clientSock = createNativeSocket(inheritable = inheritable)
|
|
if clientSock == osInvalidSocket: raiseOSError(osLastError())
|
|
|
|
const lpOutputLen = 1024
|
|
var lpOutputBuf = newString(lpOutputLen)
|
|
var dwBytesReceived: DWORD
|
|
let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
|
|
let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
|
|
let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
|
|
|
|
template failAccept(errcode) =
|
|
if flags.isDisconnectionError(errcode):
|
|
var newAcceptFut = acceptAddr(socket, flags)
|
|
newAcceptFut.callback =
|
|
proc () =
|
|
if newAcceptFut.failed:
|
|
retFuture.fail(newAcceptFut.readError)
|
|
else:
|
|
retFuture.complete(newAcceptFut.read)
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
|
|
|
template completeAccept() {.dirty.} =
|
|
var listenSock = socket
|
|
let setoptRet = setsockopt(clientSock, SOL_SOCKET,
|
|
SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
|
|
sizeof(listenSock).SockLen)
|
|
if setoptRet != 0:
|
|
let errcode = osLastError()
|
|
discard clientSock.closesocket()
|
|
failAccept(errcode)
|
|
else:
|
|
var localSockaddr, remoteSockaddr: ptr SockAddr
|
|
var localLen, remoteLen: int32
|
|
getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
|
|
dwLocalAddressLength, dwRemoteAddressLength,
|
|
addr localSockaddr, addr localLen,
|
|
addr remoteSockaddr, addr remoteLen)
|
|
try:
|
|
let address = getAddrString(remoteSockaddr)
|
|
register(clientSock.AsyncFD)
|
|
retFuture.complete((address: address, client: clientSock.AsyncFD))
|
|
except:
|
|
# getAddrString may raise
|
|
clientSock.close()
|
|
retFuture.fail(getCurrentException())
|
|
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
completeAccept()
|
|
else:
|
|
failAccept(errcode)
|
|
)
|
|
|
|
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
|
|
let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0],
|
|
dwReceiveDataLength,
|
|
dwLocalAddressLength,
|
|
dwRemoteAddressLength,
|
|
addr dwBytesReceived, cast[POVERLAPPED](ol))
|
|
|
|
if not ret:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
failAccept(err)
|
|
GC_unref(ol)
|
|
else:
|
|
completeAccept()
|
|
# We don't deallocate ``ol`` here because even though this completed
|
|
# immediately poll will still be notified about its completion and it will
|
|
# free ``ol``.
|
|
|
|
return retFuture
|
|
|
|
implementSetInheritable()
|
|
|
|
proc closeSocket*(socket: AsyncFD) =
|
|
## Closes a socket and ensures that it is unregistered.
|
|
socket.SocketHandle.close()
|
|
getGlobalDispatcher().handles.excl(socket)
|
|
|
|
proc unregister*(fd: AsyncFD) =
|
|
## Unregisters ``fd``.
|
|
getGlobalDispatcher().handles.excl(fd)
|
|
|
|
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
|
|
return fd in disp.handles
|
|
|
|
{.push stackTrace: off.}
|
|
proc waitableCallback(param: pointer,
|
|
timerOrWaitFired: WINBOOL) {.stdcall.} =
|
|
var p = cast[PostCallbackDataPtr](param)
|
|
discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
|
|
ULONG_PTR(p.handleFd),
|
|
cast[pointer](p.ovl))
|
|
{.pop.}
|
|
|
|
proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
|
|
let p = getGlobalDispatcher()
|
|
var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
|
|
var hEvent = wsaCreateEvent()
|
|
if hEvent == 0:
|
|
raiseOSError(osLastError())
|
|
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
|
|
pcd.ioPort = p.ioPort
|
|
pcd.handleFd = fd
|
|
var ol = newCustom()
|
|
|
|
ol.data = CompletionData(fd: fd, cb:
|
|
proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
|
|
# we excluding our `fd` because cb(fd) can register own handler
|
|
# for this `fd`
|
|
p.handles.excl(fd)
|
|
# unregisterWait() is called before callback, because appropriate
|
|
# winsockets function can re-enable event.
|
|
# https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
|
|
if unregisterWait(pcd.waitFd) == 0:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
deallocShared(cast[pointer](pcd))
|
|
discard wsaCloseEvent(hEvent)
|
|
raiseOSError(err)
|
|
if cb(fd):
|
|
# callback returned `true`, so we free all allocated resources
|
|
deallocShared(cast[pointer](pcd))
|
|
if not wsaCloseEvent(hEvent):
|
|
raiseOSError(osLastError())
|
|
# pcd.ovl will be unrefed in poll().
|
|
else:
|
|
# callback returned `false` we need to continue
|
|
if p.handles.contains(fd):
|
|
# new callback was already registered with `fd`, so we free all
|
|
# allocated resources. This happens because in callback `cb`
|
|
# addRead/addWrite was called with same `fd`.
|
|
deallocShared(cast[pointer](pcd))
|
|
if not wsaCloseEvent(hEvent):
|
|
raiseOSError(osLastError())
|
|
else:
|
|
# we need to include `fd` again
|
|
p.handles.incl(fd)
|
|
# and register WaitForSingleObject again
|
|
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
|
|
cast[WAITORTIMERCALLBACK](waitableCallback),
|
|
cast[pointer](pcd), INFINITE, flags):
|
|
# pcd.ovl will be unrefed in poll()
|
|
let err = osLastError()
|
|
deallocShared(cast[pointer](pcd))
|
|
discard wsaCloseEvent(hEvent)
|
|
raiseOSError(err)
|
|
else:
|
|
# we incref `pcd.ovl` and `protect` callback one more time,
|
|
# because it will be unrefed and disposed in `poll()` after
|
|
# callback finishes.
|
|
GC_ref(pcd.ovl)
|
|
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
|
|
)
|
|
# We need to protect our callback environment value, so GC will not free it
|
|
# accidentally.
|
|
ol.data.cell = system.protect(rawEnv(ol.data.cb))
|
|
|
|
# This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
|
|
# will be signaled when appropriate `mask` events will be triggered.
|
|
if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
|
|
let err = osLastError()
|
|
GC_unref(ol)
|
|
deallocShared(cast[pointer](pcd))
|
|
discard wsaCloseEvent(hEvent)
|
|
raiseOSError(err)
|
|
|
|
pcd.ovl = ol
|
|
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
|
|
cast[WAITORTIMERCALLBACK](waitableCallback),
|
|
cast[pointer](pcd), INFINITE, flags):
|
|
let err = osLastError()
|
|
GC_unref(ol)
|
|
deallocShared(cast[pointer](pcd))
|
|
discard wsaCloseEvent(hEvent)
|
|
raiseOSError(err)
|
|
p.handles.incl(fd)
|
|
|
|
proc addRead*(fd: AsyncFD, cb: Callback) =
|
|
## Start watching the file descriptor for read availability and then call
|
|
## the callback ``cb``.
|
|
##
|
|
## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
|
|
## so if you can avoid it, please do it. Use `addRead` only if really
|
|
## need it (main usecase is adaptation of unix-like libraries to be
|
|
## asynchronous on Windows).
|
|
##
|
|
## If you use this function, you don't need to use asyncdispatch.recv()
|
|
## or asyncdispatch.accept(), because they are using IOCP, please use
|
|
## nativesockets.recv() and nativesockets.accept() instead.
|
|
##
|
|
## Be sure your callback ``cb`` returns ``true``, if you want to remove
|
|
## watch of `read` notifications, and ``false``, if you want to continue
|
|
## receiving notifications.
|
|
registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
|
|
|
|
proc addWrite*(fd: AsyncFD, cb: Callback) =
|
|
## Start watching the file descriptor for write availability and then call
|
|
## the callback ``cb``.
|
|
##
|
|
## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
|
|
## so if you can avoid it, please do it. Use `addWrite` only if really
|
|
## need it (main usecase is adaptation of unix-like libraries to be
|
|
## asynchronous on Windows).
|
|
##
|
|
## If you use this function, you don't need to use asyncdispatch.send()
|
|
## or asyncdispatch.connect(), because they are using IOCP, please use
|
|
## nativesockets.send() and nativesockets.connect() instead.
|
|
##
|
|
## Be sure your callback ``cb`` returns ``true``, if you want to remove
|
|
## watch of `write` notifications, and ``false``, if you want to continue
|
|
## receiving notifications.
|
|
registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
|
|
|
|
template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
|
|
handleCallback) =
|
|
let handleFD = AsyncFD(hEvent)
|
|
pcd.ioPort = p.ioPort
|
|
pcd.handleFd = handleFD
|
|
var ol = newCustom()
|
|
ol.data.fd = handleFD
|
|
ol.data.cb = handleCallback
|
|
# We need to protect our callback environment value, so GC will not free it
|
|
# accidentally.
|
|
ol.data.cell = system.protect(rawEnv(ol.data.cb))
|
|
|
|
pcd.ovl = ol
|
|
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
|
|
cast[WAITORTIMERCALLBACK](waitableCallback),
|
|
cast[pointer](pcd), timeout.DWORD, flags):
|
|
let err = osLastError()
|
|
GC_unref(ol)
|
|
deallocShared(cast[pointer](pcd))
|
|
discard closeHandle(hEvent)
|
|
raiseOSError(err)
|
|
p.handles.incl(handleFD)
|
|
|
|
template closeWaitable(handle: untyped) =
|
|
let waitFd = pcd.waitFd
|
|
deallocShared(cast[pointer](pcd))
|
|
p.handles.excl(fd)
|
|
if unregisterWait(waitFd) == 0:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
discard closeHandle(handle)
|
|
raiseOSError(err)
|
|
if closeHandle(handle) == 0:
|
|
raiseOSError(osLastError())
|
|
|
|
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
|
|
## Registers callback ``cb`` to be called when timer expired.
|
|
##
|
|
## Parameters:
|
|
##
|
|
## * ``timeout`` - timeout value in milliseconds.
|
|
## * ``oneshot``
|
|
## * `true` - generate only one timeout event
|
|
## * `false` - generate timeout events periodically
|
|
|
|
doAssert(timeout > 0)
|
|
let p = getGlobalDispatcher()
|
|
|
|
var hEvent = createEvent(nil, 1, 0, nil)
|
|
if hEvent == INVALID_HANDLE_VALUE:
|
|
raiseOSError(osLastError())
|
|
|
|
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
|
|
var flags = WT_EXECUTEINWAITTHREAD.DWORD
|
|
if oneshot: flags = flags or WT_EXECUTEONLYONCE
|
|
|
|
proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
let res = cb(fd)
|
|
if res or oneshot:
|
|
closeWaitable(hEvent)
|
|
else:
|
|
# if callback returned `false`, then it wants to be called again, so
|
|
# we need to ref and protect `pcd.ovl` again, because it will be
|
|
# unrefed and disposed in `poll()`.
|
|
GC_ref(pcd.ovl)
|
|
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
|
|
|
|
registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
|
|
|
|
proc addProcess*(pid: int, cb: Callback) =
|
|
## Registers callback ``cb`` to be called when process with process ID
|
|
## ``pid`` exited.
|
|
const NULL = Handle(0)
|
|
let p = getGlobalDispatcher()
|
|
let procFlags = SYNCHRONIZE
|
|
var hProcess = openProcess(procFlags, 0, pid.DWORD)
|
|
if hProcess == NULL:
|
|
raiseOSError(osLastError())
|
|
|
|
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
|
|
var flags = WT_EXECUTEINWAITTHREAD.DWORD
|
|
|
|
proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
closeWaitable(hProcess)
|
|
discard cb(fd)
|
|
|
|
registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
|
|
|
|
proc newAsyncEvent*(): AsyncEvent =
|
|
## Creates a new thread-safe ``AsyncEvent`` object.
|
|
##
|
|
## New ``AsyncEvent`` object is not automatically registered with
|
|
## dispatcher like ``AsyncSocket``.
|
|
var sa = SECURITY_ATTRIBUTES(
|
|
nLength: sizeof(SECURITY_ATTRIBUTES).cint,
|
|
bInheritHandle: 1
|
|
)
|
|
var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
|
|
if event == INVALID_HANDLE_VALUE:
|
|
raiseOSError(osLastError())
|
|
result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
|
|
result.hEvent = event
|
|
|
|
proc trigger*(ev: AsyncEvent) =
|
|
## Set event ``ev`` to signaled state.
|
|
if setEvent(ev.hEvent) == 0:
|
|
raiseOSError(osLastError())
|
|
|
|
proc unregister*(ev: AsyncEvent) =
|
|
## Unregisters event ``ev``.
|
|
doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
|
|
let p = getGlobalDispatcher()
|
|
p.handles.excl(AsyncFD(ev.hEvent))
|
|
if unregisterWait(ev.hWaiter) == 0:
|
|
let err = osLastError()
|
|
if err.int32 != ERROR_IO_PENDING:
|
|
raiseOSError(err)
|
|
ev.hWaiter = 0
|
|
|
|
proc close*(ev: AsyncEvent) =
|
|
## Closes event ``ev``.
|
|
let res = closeHandle(ev.hEvent)
|
|
deallocShared(cast[pointer](ev))
|
|
if res == 0:
|
|
raiseOSError(osLastError())
|
|
|
|
proc addEvent*(ev: AsyncEvent, cb: Callback) =
|
|
## Registers callback ``cb`` to be called when ``ev`` will be signaled
|
|
doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
|
|
|
|
let p = getGlobalDispatcher()
|
|
let hEvent = ev.hEvent
|
|
|
|
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
|
|
var flags = WT_EXECUTEINWAITTHREAD.DWORD
|
|
|
|
proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if ev.hWaiter != 0:
|
|
if cb(fd):
|
|
# we need this check to avoid exception, if `unregister(event)` was
|
|
# called in callback.
|
|
deallocShared(cast[pointer](pcd))
|
|
if ev.hWaiter != 0:
|
|
unregister(ev)
|
|
else:
|
|
# if callback returned `false`, then it wants to be called again, so
|
|
# we need to ref and protect `pcd.ovl` again, because it will be
|
|
# unrefed and disposed in `poll()`.
|
|
GC_ref(pcd.ovl)
|
|
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
|
|
else:
|
|
# if ev.hWaiter == 0, then event was unregistered before `poll()` call.
|
|
deallocShared(cast[pointer](pcd))
|
|
|
|
registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
|
|
ev.hWaiter = pcd.waitFd
|
|
|
|
initAll()
|
|
else:
|
|
import selectors
|
|
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
|
|
MSG_NOSIGNAL
|
|
when declared(posix.accept4):
|
|
from posix import accept4, SOCK_CLOEXEC
|
|
|
|
const
|
|
InitCallbackListSize = 4 # initial size of callbacks sequence,
|
|
# associated with file/socket descriptor.
|
|
InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
|
|
# queue.
|
|
type
|
|
AsyncFD* = distinct cint
|
|
Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
|
|
|
|
AsyncData = object
|
|
readList: seq[Callback]
|
|
writeList: seq[Callback]
|
|
|
|
AsyncEvent* = distinct SelectEvent
|
|
|
|
PDispatcher* = ref object of PDispatcherBase
|
|
selector: Selector[AsyncData]
|
|
|
|
proc `==`*(x, y: AsyncFD): bool {.borrow.}
|
|
proc `==`*(x, y: AsyncEvent): bool {.borrow.}
|
|
|
|
template newAsyncData(): AsyncData =
|
|
AsyncData(
|
|
readList: newSeqOfCap[Callback](InitCallbackListSize),
|
|
writeList: newSeqOfCap[Callback](InitCallbackListSize)
|
|
)
|
|
|
|
proc newDispatcher*(): owned(PDispatcher) =
|
|
new result
|
|
result.selector = newSelector[AsyncData]()
|
|
result.timers.clear()
|
|
result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
|
|
|
|
var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
|
|
|
|
proc setGlobalDispatcher*(disp: owned PDispatcher) =
|
|
if not gDisp.isNil:
|
|
assert gDisp.callbacks.len == 0
|
|
gDisp = disp
|
|
initCallSoonProc()
|
|
|
|
proc getGlobalDispatcher*(): PDispatcher =
|
|
if gDisp.isNil:
|
|
setGlobalDispatcher(newDispatcher())
|
|
result = gDisp
|
|
|
|
proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
|
|
return disp.selector
|
|
|
|
proc register*(fd: AsyncFD) =
|
|
let p = getGlobalDispatcher()
|
|
var data = newAsyncData()
|
|
p.selector.registerHandle(fd.SocketHandle, {}, data)
|
|
|
|
proc unregister*(fd: AsyncFD) =
|
|
getGlobalDispatcher().selector.unregister(fd.SocketHandle)
|
|
|
|
proc unregister*(ev: AsyncEvent) =
|
|
getGlobalDispatcher().selector.unregister(SelectEvent(ev))
|
|
|
|
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
|
|
return fd.SocketHandle in disp.selector
|
|
|
|
proc addRead*(fd: AsyncFD, cb: Callback) =
|
|
let p = getGlobalDispatcher()
|
|
var newEvents = {Event.Read}
|
|
withData(p.selector, fd.SocketHandle, adata) do:
|
|
adata.readList.add(cb)
|
|
newEvents.incl(Event.Read)
|
|
if len(adata.writeList) != 0: newEvents.incl(Event.Write)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
p.selector.updateHandle(fd.SocketHandle, newEvents)
|
|
|
|
proc addWrite*(fd: AsyncFD, cb: Callback) =
|
|
let p = getGlobalDispatcher()
|
|
var newEvents = {Event.Write}
|
|
withData(p.selector, fd.SocketHandle, adata) do:
|
|
adata.writeList.add(cb)
|
|
newEvents.incl(Event.Write)
|
|
if len(adata.readList) != 0: newEvents.incl(Event.Read)
|
|
do:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
p.selector.updateHandle(fd.SocketHandle, newEvents)
|
|
|
|
proc hasPendingOperations*(): bool =
|
|
let p = getGlobalDispatcher()
|
|
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
|
|
|
|
proc processBasicCallbacks(
|
|
fd: AsyncFD, event: Event
|
|
): tuple[readCbListCount, writeCbListCount: int] =
|
|
# Process pending descriptor and AsyncEvent callbacks.
|
|
#
|
|
# Invoke every callback stored in `rwlist`, until one
|
|
# returns `false` (which means callback wants to stay
|
|
# alive). In such case all remaining callbacks will be added
|
|
# to `rwlist` again, in the order they have been inserted.
|
|
#
|
|
# `rwlist` associated with file descriptor MUST BE emptied before
|
|
# dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
|
|
# or it can be possible to fall into endless cycle.
|
|
var curList: seq[Callback]
|
|
|
|
let selector = getGlobalDispatcher().selector
|
|
withData(selector, fd.int, fdData):
|
|
case event
|
|
of Event.Read:
|
|
shallowCopy(curList, fdData.readList)
|
|
fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
|
|
of Event.Write:
|
|
shallowCopy(curList, fdData.writeList)
|
|
fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
|
|
else:
|
|
assert false, "Cannot process callbacks for " & $event
|
|
|
|
let newLength = max(len(curList), InitCallbackListSize)
|
|
var newList = newSeqOfCap[Callback](newLength)
|
|
|
|
var eventsExtinguished = false
|
|
for cb in curList:
|
|
if eventsExtinguished:
|
|
newList.add(cb)
|
|
continue
|
|
if not cb(fd):
|
|
# Callback wants to be called again.
|
|
newList.add(cb)
|
|
# This callback has returned with EAGAIN, so we don't need to
|
|
# call any other callbacks as they are all waiting for the same event
|
|
# on the same fd.
|
|
# We do need to ensure they are called again though.
|
|
eventsExtinguished = true
|
|
|
|
withData(selector, fd.int, fdData) do:
|
|
# Descriptor is still present in the queue.
|
|
case event
|
|
of Event.Read:
|
|
fdData.readList = newList & fdData.readList
|
|
of Event.Write:
|
|
fdData.writeList = newList & fdData.writeList
|
|
else:
|
|
assert false, "Cannot process callbacks for " & $event
|
|
|
|
result.readCbListCount = len(fdData.readList)
|
|
result.writeCbListCount = len(fdData.writeList)
|
|
do:
|
|
# Descriptor was unregistered in callback via `unregister()`.
|
|
result.readCbListCount = -1
|
|
result.writeCbListCount = -1
|
|
|
|
template processCustomCallbacks(ident: untyped) =
|
|
# Process pending custom event callbacks. Custom events are
|
|
# {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
|
|
# There can be only one callback registered with one descriptor,
|
|
# so there is no need to iterate over list.
|
|
var curList: seq[Callback]
|
|
|
|
withData(p.selector, ident.int, adata) do:
|
|
shallowCopy(curList, adata.readList)
|
|
adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
|
|
|
|
let newLength = len(curList)
|
|
var newList = newSeqOfCap[Callback](newLength)
|
|
|
|
var cb = curList[0]
|
|
if not cb(fd.AsyncFD):
|
|
newList.add(cb)
|
|
|
|
withData(p.selector, ident.int, adata) do:
|
|
# descriptor still present in queue.
|
|
adata.readList = newList & adata.readList
|
|
if len(adata.readList) == 0:
|
|
# if no callbacks registered with descriptor, unregister it.
|
|
p.selector.unregister(fd.int)
|
|
do:
|
|
# descriptor was unregistered in callback via `unregister()`.
|
|
discard
|
|
|
|
implementSetInheritable()
|
|
|
|
proc closeSocket*(sock: AsyncFD) =
|
|
let selector = getGlobalDispatcher().selector
|
|
if sock.SocketHandle notin selector:
|
|
raise newException(ValueError, "File descriptor not registered.")
|
|
|
|
let data = selector.getData(sock.SocketHandle)
|
|
sock.unregister()
|
|
sock.SocketHandle.close()
|
|
# We need to unblock the read and write callbacks which could still be
|
|
# waiting for the socket to become readable and/or writeable.
|
|
for cb in data.readList & data.writeList:
|
|
if not cb(sock):
|
|
raise newException(
|
|
ValueError, "Expecting async operations to stop when fd has closed."
|
|
)
|
|
|
|
|
|
proc runOnce(timeout = 500): bool =
|
|
let p = getGlobalDispatcher()
|
|
when ioselSupportedPlatform:
|
|
let customSet = {Event.Timer, Event.Signal, Event.Process,
|
|
Event.Vnode}
|
|
|
|
if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
|
|
raise newException(ValueError,
|
|
"No handles or timers registered in dispatcher.")
|
|
|
|
result = false
|
|
var keys: array[64, ReadyKey]
|
|
let nextTimer = processTimers(p, result)
|
|
var count =
|
|
p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
|
|
for i in 0..<count:
|
|
let fd = keys[i].fd.AsyncFD
|
|
let events = keys[i].events
|
|
var (readCbListCount, writeCbListCount) = (0, 0)
|
|
|
|
if Event.Read in events or events == {Event.Error}:
|
|
(readCbListCount, writeCbListCount) =
|
|
processBasicCallbacks(fd, Event.Read)
|
|
result = true
|
|
|
|
if Event.Write in events or events == {Event.Error}:
|
|
(readCbListCount, writeCbListCount) =
|
|
processBasicCallbacks(fd, Event.Write)
|
|
result = true
|
|
|
|
var isCustomEvent = false
|
|
if Event.User in events:
|
|
(readCbListCount, writeCbListCount) =
|
|
processBasicCallbacks(fd, Event.Read)
|
|
isCustomEvent = true
|
|
if readCbListCount == 0:
|
|
p.selector.unregister(fd.int)
|
|
result = true
|
|
|
|
when ioselSupportedPlatform:
|
|
if (customSet * events) != {}:
|
|
isCustomEvent = true
|
|
processCustomCallbacks(fd)
|
|
result = true
|
|
|
|
# because state `data` can be modified in callback we need to update
|
|
# descriptor events with currently registered callbacks.
|
|
if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
|
|
var newEvents: set[Event] = {}
|
|
if readCbListCount > 0: incl(newEvents, Event.Read)
|
|
if writeCbListCount > 0: incl(newEvents, Event.Write)
|
|
p.selector.updateHandle(SocketHandle(fd), newEvents)
|
|
|
|
# Timer processing.
|
|
discard processTimers(p, result)
|
|
# Callback queue processing
|
|
processPendingCallbacks(p, result)
|
|
|
|
proc recv*(socket: AsyncFD, size: int,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
|
|
var retFuture = newFuture[string]("recv")
|
|
|
|
var readBuffer = newString(size)
|
|
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
|
|
flags.toOSFlags())
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
if flags.isDisconnectionError(lastError):
|
|
retFuture.complete("")
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
else:
|
|
result = false # We still want this callback to be called.
|
|
elif res == 0:
|
|
# Disconnected
|
|
retFuture.complete("")
|
|
else:
|
|
readBuffer.setLen(res)
|
|
retFuture.complete(readBuffer)
|
|
# TODO: The following causes a massive slowdown.
|
|
#if not cb(socket):
|
|
addRead(socket, cb)
|
|
return retFuture
|
|
|
|
proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
|
|
var retFuture = newFuture[int]("recvInto")
|
|
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
let res = recv(sock.SocketHandle, buf, size.cint,
|
|
flags.toOSFlags())
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
if flags.isDisconnectionError(lastError):
|
|
retFuture.complete(0)
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
else:
|
|
result = false # We still want this callback to be called.
|
|
else:
|
|
retFuture.complete(res)
|
|
# TODO: The following causes a massive slowdown.
|
|
#if not cb(socket):
|
|
addRead(socket, cb)
|
|
return retFuture
|
|
|
|
proc send*(socket: AsyncFD, buf: pointer, size: int,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
|
|
var retFuture = newFuture[void]("send")
|
|
|
|
var written = 0
|
|
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
let netSize = size-written
|
|
var d = cast[cstring](buf)
|
|
let res = send(sock.SocketHandle, addr d[written], netSize.cint,
|
|
MSG_NOSIGNAL)
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and
|
|
lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
if flags.isDisconnectionError(lastError):
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.fail(newOSError(lastError))
|
|
else:
|
|
result = false # We still want this callback to be called.
|
|
else:
|
|
written.inc(res)
|
|
if res != netSize:
|
|
result = false # We still have data to send.
|
|
else:
|
|
retFuture.complete()
|
|
# TODO: The following causes crashes.
|
|
#if not cb(socket):
|
|
addWrite(socket, cb)
|
|
return retFuture
|
|
|
|
proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
|
|
saddrLen: SockLen,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
|
|
## Sends ``data`` of size ``size`` in bytes to specified destination
|
|
## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``.
|
|
## The returned future will complete once all data has been sent.
|
|
var retFuture = newFuture[void]("sendTo")
|
|
|
|
# we will preserve address in our stack
|
|
var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
|
|
var stalen = saddrLen
|
|
zeroMem(addr(staddr[0]), 128)
|
|
copyMem(addr(staddr[0]), saddr, saddrLen)
|
|
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
|
|
cast[ptr SockAddr](addr(staddr[0])), stalen)
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
else:
|
|
result = false # We still want this callback to be called.
|
|
else:
|
|
retFuture.complete()
|
|
|
|
addWrite(socket, cb)
|
|
return retFuture
|
|
|
|
proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
|
|
saddr: ptr SockAddr, saddrLen: ptr SockLen,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
|
|
## Receives a datagram data from ``socket`` into ``data``, which must
|
|
## be at least of size ``size`` in bytes, address of datagram's sender
|
|
## will be stored into ``saddr`` and ``saddrLen``. Returned future will
|
|
## complete once one datagram has been received, and will return size
|
|
## of packet received.
|
|
var retFuture = newFuture[int]("recvFromInto")
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
|
|
saddr, saddrLen)
|
|
if res < 0:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
|
|
lastError.int32 != EAGAIN:
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
else:
|
|
result = false
|
|
else:
|
|
retFuture.complete(res)
|
|
addRead(socket, cb)
|
|
return retFuture
|
|
|
|
proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
|
|
inheritable = defined(nimInheritHandles)):
|
|
owned(Future[tuple[address: string, client: AsyncFD]]) =
|
|
var retFuture = newFuture[tuple[address: string,
|
|
client: AsyncFD]]("acceptAddr")
|
|
proc cb(sock: AsyncFD): bool =
|
|
result = true
|
|
var sockAddress: Sockaddr_storage
|
|
var addrLen = sizeof(sockAddress).SockLen
|
|
var client =
|
|
when declared(accept4):
|
|
accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
|
|
addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
|
|
else:
|
|
accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
|
|
addr(addrLen))
|
|
when declared(setInheritable) and not declared(accept4):
|
|
if client != osInvalidSocket and not setInheritable(client, inheritable):
|
|
# Set failure first because close() itself can fail,
|
|
# altering osLastError().
|
|
retFuture.fail(newOSError(osLastError()))
|
|
close client
|
|
return false
|
|
|
|
if client == osInvalidSocket:
|
|
let lastError = osLastError()
|
|
assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
|
|
if lastError.int32 == EINTR:
|
|
return false
|
|
else:
|
|
if flags.isDisconnectionError(lastError):
|
|
return false
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
else:
|
|
try:
|
|
let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
|
|
register(client.AsyncFD)
|
|
retFuture.complete((address, client.AsyncFD))
|
|
except:
|
|
# getAddrString may raise
|
|
client.close()
|
|
retFuture.fail(getCurrentException())
|
|
addRead(socket, cb)
|
|
return retFuture
|
|
|
|
when ioselSupportedPlatform:
|
|
|
|
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
|
|
## Start watching for timeout expiration, and then call the
|
|
## callback ``cb``.
|
|
## ``timeout`` - time in milliseconds,
|
|
## ``oneshot`` - if ``true`` only one event will be dispatched,
|
|
## if ``false`` continuous events every ``timeout`` milliseconds.
|
|
let p = getGlobalDispatcher()
|
|
var data = newAsyncData()
|
|
data.readList.add(cb)
|
|
p.selector.registerTimer(timeout, oneshot, data)
|
|
|
|
proc addSignal*(signal: int, cb: Callback) =
|
|
## Start watching signal ``signal``, and when signal appears, call the
|
|
## callback ``cb``.
|
|
let p = getGlobalDispatcher()
|
|
var data = newAsyncData()
|
|
data.readList.add(cb)
|
|
p.selector.registerSignal(signal, data)
|
|
|
|
proc addProcess*(pid: int, cb: Callback) =
|
|
## Start watching for process exit with pid ``pid``, and then call
|
|
## the callback ``cb``.
|
|
let p = getGlobalDispatcher()
|
|
var data = newAsyncData()
|
|
data.readList.add(cb)
|
|
p.selector.registerProcess(pid, data)
|
|
|
|
proc newAsyncEvent*(): AsyncEvent =
|
|
## Creates new ``AsyncEvent``.
|
|
result = AsyncEvent(newSelectEvent())
|
|
|
|
proc trigger*(ev: AsyncEvent) =
|
|
## Sets new ``AsyncEvent`` to signaled state.
|
|
trigger(SelectEvent(ev))
|
|
|
|
proc close*(ev: AsyncEvent) =
|
|
## Closes ``AsyncEvent``
|
|
close(SelectEvent(ev))
|
|
|
|
proc addEvent*(ev: AsyncEvent, cb: Callback) =
|
|
## Start watching for event ``ev``, and call callback ``cb``, when
|
|
## ev will be set to signaled state.
|
|
let p = getGlobalDispatcher()
|
|
var data = newAsyncData()
|
|
data.readList.add(cb)
|
|
p.selector.registerEvent(SelectEvent(ev), data)
|
|
|
|
proc drain*(timeout = 500) =
|
|
## Waits for completion of **all** events and processes them. Raises ``ValueError``
|
|
## if there are no pending operations. In contrast to ``poll`` this
|
|
## processes as many events as are available until the timeout has elapsed.
|
|
var curTimeout = timeout
|
|
let start = now()
|
|
while hasPendingOperations():
|
|
discard runOnce(curTimeout)
|
|
curTimeout -= (now() - start).inMilliseconds.int
|
|
if curTimeout < 0:
|
|
break
|
|
|
|
proc poll*(timeout = 500) =
|
|
## Waits for completion events and processes them. Raises ``ValueError``
|
|
## if there are no pending operations. This runs the underlying OS
|
|
## `epoll`:idx: or `kqueue`:idx: primitive only once.
|
|
discard runOnce(timeout)
|
|
|
|
template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
|
|
inheritable = defined(nimInheritHandles)) =
|
|
let handle = createNativeSocket(domain, sockType, protocol, inheritable)
|
|
if handle == osInvalidSocket:
|
|
return osInvalidSocket.AsyncFD
|
|
handle.setBlocking(false)
|
|
when defined(macosx) and not defined(nimdoc):
|
|
handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
|
|
result = handle.AsyncFD
|
|
register(result)
|
|
|
|
proc createAsyncNativeSocket*(domain: cint, sockType: cint,
|
|
protocol: cint,
|
|
inheritable = defined(nimInheritHandles)): AsyncFD =
|
|
createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
|
|
|
|
proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
|
|
sockType: SockType = SOCK_STREAM,
|
|
protocol: Protocol = IPPROTO_TCP,
|
|
inheritable = defined(nimInheritHandles)): AsyncFD =
|
|
createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
|
|
|
|
when defined(windows) or defined(nimdoc):
|
|
proc bindToDomain(handle: SocketHandle, domain: Domain) =
|
|
# Extracted into a separate proc, because connect() on Windows requires
|
|
# the socket to be initially bound.
|
|
template doBind(saddr) =
|
|
if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
|
|
sizeof(saddr).SockLen) < 0'i32:
|
|
raiseOSError(osLastError())
|
|
|
|
if domain == Domain.AF_INET6:
|
|
var saddr: Sockaddr_in6
|
|
saddr.sin6_family = uint16(toInt(domain))
|
|
doBind(saddr)
|
|
else:
|
|
var saddr: Sockaddr_in
|
|
saddr.sin_family = uint16(toInt(domain))
|
|
doBind(saddr)
|
|
|
|
proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
|
|
let retFuture = newFuture[void]("doConnect")
|
|
result = retFuture
|
|
|
|
var ol = newCustom()
|
|
ol.data = CompletionData(fd: socket, cb:
|
|
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
|
if not retFuture.finished:
|
|
if errcode == OSErrorCode(-1):
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
|
)
|
|
|
|
let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
|
|
cint(addrInfo.ai_addrlen), nil, 0, nil,
|
|
cast[POVERLAPPED](ol))
|
|
if ret:
|
|
# Request to connect completed immediately.
|
|
retFuture.complete()
|
|
# We don't deallocate ``ol`` here because even though this completed
|
|
# immediately poll will still be notified about its completion and it
|
|
# will free ``ol``.
|
|
else:
|
|
let lastError = osLastError()
|
|
if lastError.int32 != ERROR_IO_PENDING:
|
|
# With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``,
|
|
# and the future will be completed/failed there, too.
|
|
GC_unref(ol)
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
else:
|
|
proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
|
|
let retFuture = newFuture[void]("doConnect")
|
|
result = retFuture
|
|
|
|
proc cb(fd: AsyncFD): bool =
|
|
let ret = SocketHandle(fd).getSockOptInt(
|
|
cint(SOL_SOCKET), cint(SO_ERROR))
|
|
if ret == 0:
|
|
# We have connected.
|
|
retFuture.complete()
|
|
return true
|
|
elif ret == EINTR:
|
|
# interrupted, keep waiting
|
|
return false
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
|
|
return true
|
|
|
|
let ret = connect(socket.SocketHandle,
|
|
addrInfo.ai_addr,
|
|
addrInfo.ai_addrlen.SockLen)
|
|
if ret == 0:
|
|
# Request to connect completed immediately.
|
|
retFuture.complete()
|
|
else:
|
|
let lastError = osLastError()
|
|
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
|
|
addWrite(socket, cb)
|
|
else:
|
|
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
|
|
|
template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
|
|
protocol: Protocol = IPPROTO_RAW) =
|
|
## Iterates through the AddrInfo linked list asynchronously
|
|
## until the connection can be established.
|
|
const shouldCreateFd = not declared(fd)
|
|
|
|
when shouldCreateFd:
|
|
let sockType = protocol.toSockType()
|
|
|
|
var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
|
|
for i in low(fdPerDomain)..high(fdPerDomain):
|
|
fdPerDomain[i] = osInvalidSocket.AsyncFD
|
|
template closeUnusedFds(domainToKeep = -1) {.dirty.} =
|
|
for i, fd in fdPerDomain:
|
|
if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
|
|
fd.closeSocket()
|
|
|
|
var lastException: ref Exception
|
|
var curAddrInfo = addrInfo
|
|
var domain: Domain
|
|
when shouldCreateFd:
|
|
var curFd: AsyncFD
|
|
else:
|
|
var curFd = fd
|
|
proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
|
|
if fut == nil or fut.failed:
|
|
if fut != nil:
|
|
lastException = fut.readError()
|
|
|
|
while curAddrInfo != nil:
|
|
let domainOpt = curAddrInfo.ai_family.toKnownDomain()
|
|
if domainOpt.isSome:
|
|
domain = domainOpt.unsafeGet()
|
|
break
|
|
curAddrInfo = curAddrInfo.ai_next
|
|
|
|
if curAddrInfo == nil:
|
|
freeaddrinfo(addrInfo)
|
|
when shouldCreateFd:
|
|
closeUnusedFds()
|
|
if lastException != nil:
|
|
retFuture.fail(lastException)
|
|
else:
|
|
retFuture.fail(newException(
|
|
IOError, "Couldn't resolve address: " & address))
|
|
return
|
|
|
|
when shouldCreateFd:
|
|
curFd = fdPerDomain[ord(domain)]
|
|
if curFd == osInvalidSocket.AsyncFD:
|
|
try:
|
|
curFd = createAsyncNativeSocket(domain, sockType, protocol)
|
|
except:
|
|
freeaddrinfo(addrInfo)
|
|
closeUnusedFds()
|
|
raise getCurrentException()
|
|
when defined(windows):
|
|
curFd.SocketHandle.bindToDomain(domain)
|
|
fdPerDomain[ord(domain)] = curFd
|
|
|
|
doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
|
|
curAddrInfo = curAddrInfo.ai_next
|
|
else:
|
|
freeaddrinfo(addrInfo)
|
|
when shouldCreateFd:
|
|
closeUnusedFds(ord(domain))
|
|
retFuture.complete(curFd)
|
|
else:
|
|
retFuture.complete()
|
|
|
|
tryNextAddrInfo(nil)
|
|
|
|
proc dial*(address: string, port: Port,
|
|
protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
|
|
## Establishes connection to the specified ``address``:``port`` pair via the
|
|
## specified protocol. The procedure iterates through possible
|
|
## resolutions of the ``address`` until it succeeds, meaning that it
|
|
## seamlessly works with both IPv4 and IPv6.
|
|
## Returns the async file descriptor, registered in the dispatcher of
|
|
## the current thread, ready to send or receive data.
|
|
let retFuture = newFuture[AsyncFD]("dial")
|
|
result = retFuture
|
|
let sockType = protocol.toSockType()
|
|
|
|
let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
|
|
asyncAddrInfoLoop(aiList, noFD, protocol)
|
|
|
|
proc connect*(socket: AsyncFD, address: string, port: Port,
|
|
domain = Domain.AF_INET): owned(Future[void]) =
|
|
let retFuture = newFuture[void]("connect")
|
|
result = retFuture
|
|
|
|
when defined(windows):
|
|
verifyPresence(socket)
|
|
else:
|
|
assert getSockDomain(socket.SocketHandle) == domain
|
|
|
|
let aiList = getAddrInfo(address, port, domain)
|
|
when defined(windows):
|
|
socket.SocketHandle.bindToDomain(domain)
|
|
asyncAddrInfoLoop(aiList, socket)
|
|
|
|
proc sleepAsync*(ms: int | float): owned(Future[void]) =
|
|
## Suspends the execution of the current async procedure for the next
|
|
## ``ms`` milliseconds.
|
|
var retFuture = newFuture[void]("sleepAsync")
|
|
let p = getGlobalDispatcher()
|
|
when ms is int:
|
|
p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
|
|
elif ms is float:
|
|
let ns = (ms * 1_000_000).int64
|
|
p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
|
|
return retFuture
|
|
|
|
proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
|
|
## Returns a future which will complete once ``fut`` completes or after
|
|
## ``timeout`` milliseconds has elapsed.
|
|
##
|
|
## If ``fut`` completes first the returned future will hold true,
|
|
## otherwise, if ``timeout`` milliseconds has elapsed first, the returned
|
|
## future will hold false.
|
|
|
|
var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
|
|
var timeoutFuture = sleepAsync(timeout)
|
|
fut.callback =
|
|
proc () =
|
|
if not retFuture.finished:
|
|
if fut.failed:
|
|
retFuture.fail(fut.error)
|
|
else:
|
|
retFuture.complete(true)
|
|
timeoutFuture.callback =
|
|
proc () =
|
|
if not retFuture.finished: retFuture.complete(false)
|
|
return retFuture
|
|
|
|
proc accept*(socket: AsyncFD,
|
|
flags = {SocketFlag.SafeDisconn},
|
|
inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
|
|
## Accepts a new connection. Returns a future containing the client socket
|
|
## corresponding to that connection.
|
|
##
|
|
## If ``inheritable`` is false (the default), the resulting client socket
|
|
## will not be inheritable by child processes.
|
|
##
|
|
## The future will complete when the connection is successfully accepted.
|
|
var retFut = newFuture[AsyncFD]("accept")
|
|
var fut = acceptAddr(socket, flags, inheritable)
|
|
fut.callback =
|
|
proc (future: Future[tuple[address: string, client: AsyncFD]]) =
|
|
assert future.finished
|
|
if future.failed:
|
|
retFut.fail(future.error)
|
|
else:
|
|
retFut.complete(future.read.client)
|
|
return retFut
|
|
|
|
proc keepAlive(x: string) =
|
|
discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
|
|
|
|
proc send*(socket: AsyncFD, data: string,
|
|
flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
|
|
## Sends ``data`` to ``socket``. The returned future will complete once all
|
|
## data has been sent.
|
|
var retFuture = newFuture[void]("send")
|
|
if data.len > 0:
|
|
let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
|
|
sendFut.callback =
|
|
proc () =
|
|
keepAlive(data)
|
|
if sendFut.failed:
|
|
retFuture.fail(sendFut.error)
|
|
else:
|
|
retFuture.complete()
|
|
else:
|
|
retFuture.complete()
|
|
|
|
return retFuture
|
|
|
|
# -- Await Macro
|
|
include asyncmacro
|
|
|
|
proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
|
|
## Returns a future that will complete when all the string data from the
|
|
## specified future stream is retrieved.
|
|
result = ""
|
|
while true:
|
|
let (hasValue, value) = await future.read()
|
|
if hasValue:
|
|
result.add(value)
|
|
else:
|
|
break
|
|
|
|
proc callSoon(cbproc: proc () {.gcsafe.}) =
|
|
getGlobalDispatcher().callbacks.addLast(cbproc)
|
|
|
|
proc runForever*() =
|
|
## Begins a never ending global dispatcher poll loop.
|
|
while true:
|
|
poll()
|
|
|
|
proc waitFor*[T](fut: Future[T]): T =
|
|
## **Blocks** the current thread until the specified future completes.
|
|
while not fut.finished:
|
|
poll()
|
|
|
|
fut.read
|