Async upcoming (#6585)

* Merge upcoming async with current.
* Various improvements to selectors (mostly docs).

Two changes to highlight:

* Renamed ``setEvent`` to ``trigger``
* Reused setBlocking from nativesockets.
* Various changes/fixes to asyncdispatch after upcoming merge.
* Make some attempts to be compatible with older selectors.
* Reuse epoll module in ioselectors_epoll.
This commit is contained in:
Dominik Picheta
2017-11-22 14:43:10 +00:00
committed by Andreas Rumpf
parent 2c584cdb3d
commit d3394be555
10 changed files with 766 additions and 925 deletions

View File

@@ -35,18 +35,13 @@ const
EPOLL_CTL_MOD* = 3 # Change file descriptor epoll_event structure.
type
epoll_data* {.importc: "union epoll_data",
EpollData* {.importc: "union epoll_data",
header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union.
#thePtr* {.importc: "ptr".}: pointer
fd* {.importc: "fd".}: cint
when defined(linux) and defined(amd64):
u32: uint32 # this field ensures that binary size is right - it cannot be
# used because its offset is wrong
#u64*: uint64
u64* {.importc: "u64".}: uint64
epoll_event* {.importc: "struct epoll_event", header: "<sys/epoll.h>", pure, final.} = object
EpollEvent* {.importc: "struct epoll_event", header: "<sys/epoll.h>", pure, final.} = object
events*: uint32 # Epoll events
data*: epoll_data # User data variable
data*: EpollData # User data variable
proc epoll_create*(size: cint): cint {.importc: "epoll_create",
header: "<sys/epoll.h>".}
@@ -60,7 +55,7 @@ proc epoll_create1*(flags: cint): cint {.importc: "epoll_create1",
## Same as epoll_create but with an FLAGS parameter. The unused SIZE
## parameter has been dropped.
proc epoll_ctl*(epfd: cint; op: cint; fd: cint | SocketHandle; event: ptr epoll_event): cint {.
proc epoll_ctl*(epfd: cint; op: cint; fd: cint | SocketHandle; event: ptr EpollEvent): cint {.
importc: "epoll_ctl", header: "<sys/epoll.h>".}
## Manipulate an epoll instance "epfd". Returns 0 in case of success,
## -1 in case of error ( the "errno" variable will contain the
@@ -69,7 +64,7 @@ proc epoll_ctl*(epfd: cint; op: cint; fd: cint | SocketHandle; event: ptr epoll_
## operation. The "event" parameter describes which events the caller
## is interested in and any associated user data.
proc epoll_wait*(epfd: cint; events: ptr epoll_event; maxevents: cint;
proc epoll_wait*(epfd: cint; events: ptr EpollEvent; maxevents: cint;
timeout: cint): cint {.importc: "epoll_wait",
header: "<sys/epoll.h>".}
## Wait for events on an epoll instance "epfd". Returns the number of
@@ -84,7 +79,7 @@ proc epoll_wait*(epfd: cint; events: ptr epoll_event; maxevents: cint;
## __THROW.
#proc epoll_pwait*(epfd: cint; events: ptr epoll_event; maxevents: cint;
#proc epoll_pwait*(epfd: cint; events: ptr EpollEvent; maxevents: cint;
# timeout: cint; ss: ptr sigset_t): cint {.
# importc: "epoll_pwait", header: "<sys/epoll.h>".}
# Same as epoll_wait, but the thread's signal mask is temporarily

View File

@@ -9,8 +9,9 @@
include "system/inclrtl"
import os, tables, strutils, times, heapqueue, options, asyncstreams
import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
import asyncfutures except callSoon
import nativesockets, net, deques
export Port, SocketFlag
@@ -136,6 +137,7 @@ export asyncfutures, asyncstreams
## and occasionally the compilation may fail altogether.
## As such it is better to use the former style when possible.
##
##
## Discarding futures
## ------------------
##
@@ -226,6 +228,12 @@ when defined(windows) or defined(nimdoc):
ovl: PCustomOverlapped
PostCallbackDataPtr = ptr PostCallbackData
AsyncEventImpl = object
hEvent: Handle
hWaiter: Handle
pcd: PostCallbackDataPtr
AsyncEvent* = ptr AsyncEventImpl
Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
{.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
@@ -332,9 +340,9 @@ when defined(windows) or defined(nimdoc):
# Callback queue processing
processPendingCallbacks(p)
var connectExPtr: pointer = nil
var acceptExPtr: pointer = nil
var getAcceptExSockAddrsPtr: pointer = nil
var acceptEx: WSAPROC_ACCEPTEX
var connectEx: WSAPROC_CONNECTEX
var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
@@ -346,56 +354,19 @@ when defined(windows) or defined(nimdoc):
proc initAll() =
let dummySock = newNativeSocket()
if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
if dummySock == INVALID_SOCKET:
raiseOSError(osLastError())
if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
var fun: pointer = nil
if not initPointer(dummySock, fun, WSAID_CONNECTEX):
raiseOSError(osLastError())
if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS):
connectEx = cast[WSAPROC_CONNECTEX](fun)
if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
raiseOSError(osLastError())
proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: Dword,
lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool =
if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().")
let fun =
cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: Dword,
lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr)
result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
lpOverlapped)
proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
lpOverlapped: POVERLAPPED): bool =
if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().")
let fun =
cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr)
result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
lpOverlapped)
proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword,
LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt,
RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) =
if getAcceptExSockAddrsPtr.isNil:
raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().")
let fun =
cast[proc (lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr,
LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr,
RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr)
fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
RemoteSockaddr, RemoteSockaddrLength)
acceptEx = cast[WSAPROC_ACCEPTEX](fun)
if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
raiseOSError(osLastError())
getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
close(dummySock)
proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): Future[string] =
@@ -506,10 +477,7 @@ when defined(windows) or defined(nimdoc):
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete(0)
else:
retFuture.complete(bytesCount)
retFuture.complete(bytesCount)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete(0)
@@ -543,10 +511,11 @@ when defined(windows) or defined(nimdoc):
proc send*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
## data has been sent.
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
## to avoid early freeing of the buffer
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
## will complete once all data has been sent.
##
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
verifyPresence(socket)
var retFuture = newFuture[void]("send")
@@ -793,7 +762,7 @@ when defined(windows) or defined(nimdoc):
cast[pointer](p.ovl))
{.pop.}
template registerWaitableEvent(mask) =
proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) =
let p = getGlobalDispatcher()
var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword
var hEvent = wsaCreateEvent()
@@ -843,8 +812,8 @@ when defined(windows) or defined(nimdoc):
cast[pointer](pcd), INFINITE, flags):
# pcd.ovl will be unrefed in poll()
let err = osLastError()
discard wsaCloseEvent(hEvent)
deallocShared(cast[pointer](pcd))
discard wsaCloseEvent(hEvent)
raiseOSError(err)
else:
# we incref `pcd.ovl` and `protect` callback one more time,
@@ -883,16 +852,17 @@ when defined(windows) or defined(nimdoc):
##
## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
## so if you can avoid it, please do it. Use `addRead` only if really
## need it (main usecase is adaptation of `unix like` libraries to be
## need it (main usecase is adaptation of unix-like libraries to be
## asynchronous on Windows).
## If you use this function, you dont need to use asyncdispatch.recv()
##
## If you use this function, you don't need to use asyncdispatch.recv()
## or asyncdispatch.accept(), because they are using IOCP, please use
## nativesockets.recv() and nativesockets.accept() instead.
##
## Be sure your callback ``cb`` returns ``true``, if you want to remove
## watch of `read` notifications, and ``false``, if you want to continue
## receiving notifies.
registerWaitableEvent(FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
## receiving notifications.
registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
proc addWrite*(fd: AsyncFD, cb: Callback) =
## Start watching the file descriptor for write availability and then call
@@ -900,43 +870,213 @@ when defined(windows) or defined(nimdoc):
##
## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
## so if you can avoid it, please do it. Use `addWrite` only if really
## need it (main usecase is adaptation of `unix like` libraries to be
## need it (main usecase is adaptation of unix-like libraries to be
## asynchronous on Windows).
## If you use this function, you dont need to use asyncdispatch.send()
##
## If you use this function, you don't need to use asyncdispatch.send()
## or asyncdispatch.connect(), because they are using IOCP, please use
## nativesockets.send() and nativesockets.connect() instead.
##
## Be sure your callback ``cb`` returns ``true``, if you want to remove
## watch of `write` notifications, and ``false``, if you want to continue
## receiving notifies.
registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE)
## receiving notifications.
registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
handleCallback) =
let handleFD = AsyncFD(hEvent)
pcd.ioPort = p.ioPort
pcd.handleFd = handleFD
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data.fd = handleFD
ol.data.cb = handleCallback
# We need to protect our callback environment value, so GC will not free it
# accidentally.
ol.data.cell = system.protect(rawEnv(ol.data.cb))
pcd.ovl = ol
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](pcd), timeout.Dword, flags):
let err = osLastError()
GC_unref(ol)
deallocShared(cast[pointer](pcd))
discard closeHandle(hEvent)
raiseOSError(err)
p.handles.incl(handleFD)
template closeWaitable(handle: untyped) =
let waitFd = pcd.waitFd
deallocShared(cast[pointer](pcd))
p.handles.excl(fd)
if unregisterWait(waitFd) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
discard closeHandle(handle)
raiseOSError(err)
if closeHandle(handle) == 0:
raiseOSError(osLastError())
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
## Registers callback ``cb`` to be called when timer expired.
##
## Parameters:
##
## * ``timeout`` - timeout value in milliseconds.
## * ``oneshot``
## * `true` - generate only one timeout event
## * `false` - generate timeout events periodically
doAssert(timeout > 0)
let p = getGlobalDispatcher()
var hEvent = createEvent(nil, 1, 0, nil)
if hEvent == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
var flags = WT_EXECUTEINWAITTHREAD.Dword
if oneshot: flags = flags or WT_EXECUTEONLYONCE
proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
let res = cb(fd)
if res or oneshot:
closeWaitable(hEvent)
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
proc addProcess*(pid: int, cb: Callback) =
## Registers callback ``cb`` to be called when process with process ID
## ``pid`` exited.
let p = getGlobalDispatcher()
let procFlags = SYNCHRONIZE
var hProcess = openProcess(procFlags, 0, pid.Dword)
if hProcess == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
var flags = WT_EXECUTEINWAITTHREAD.Dword
proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
closeWaitable(hProcess)
discard cb(fd)
registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
proc newAsyncEvent*(): AsyncEvent =
## Creates a new thread-safe ``AsyncEvent`` object.
##
## New ``AsyncEvent`` object is not automatically registered with # TODO: Why? -- DP
## dispatcher like ``AsyncSocket``.
var sa = SECURITY_ATTRIBUTES(
nLength: sizeof(SECURITY_ATTRIBUTES).cint,
bInheritHandle: 1
)
var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
if event == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
result.hEvent = event
proc trigger*(ev: AsyncEvent) =
## Set event ``ev`` to signaled state.
if setEvent(ev.hEvent) == 0:
raiseOSError(osLastError())
proc unregister*(ev: AsyncEvent) =
## Unregisters event ``ev``.
doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
let p = getGlobalDispatcher()
p.handles.excl(AsyncFD(ev.hEvent))
if unregisterWait(ev.hWaiter) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(err)
ev.hWaiter = 0
proc close*(ev: AsyncEvent) =
## Closes event ``ev``.
let res = closeHandle(ev.hEvent)
deallocShared(cast[pointer](ev))
if res == 0:
raiseOSError(osLastError())
proc addEvent*(ev: AsyncEvent, cb: Callback) =
## Registers callback ``cb`` to be called when ``ev`` will be signaled
doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
let p = getGlobalDispatcher()
let hEvent = ev.hEvent
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
var flags = WT_EXECUTEINWAITTHREAD.Dword
proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if ev.hWaiter != 0:
if cb(fd):
# we need this check to avoid exception, if `unregister(event)` was
# called in callback.
deallocShared(cast[pointer](pcd))
if ev.hWaiter != 0:
unregister(ev)
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
else:
# if ev.hWaiter == 0, then event was unregistered before `poll()` call.
deallocShared(cast[pointer](pcd))
registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
ev.hWaiter = pcd.waitFd
initAll()
else:
import selectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
const
InitCallbackListSize = 4 # initial size of callbacks sequence,
# associated with file/socket descriptor.
InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
# queue.
type
AsyncFD* = distinct cint
Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
PData* = ref object of RootRef
fd: AsyncFD
readCBs: seq[Callback]
writeCBs: seq[Callback]
AsyncData = object
readList: seq[Callback]
writeList: seq[Callback]
AsyncEvent* = distinct SelectEvent
PDispatcher* = ref object of PDispatcherBase
selector: Selector
selector: Selector[AsyncData]
{.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
proc `==`*(x, y: AsyncFD): bool {.borrow.}
proc `==`*(x, y: AsyncEvent): bool {.borrow.}
template newAsyncData(): AsyncData =
AsyncData(
readList: newSeqOfCap[Callback](InitCallbackListSize),
writeList: newSeqOfCap[Callback](InitCallbackListSize)
)
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector()
result.selector = newSelector[AsyncData]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
@@ -951,15 +1091,10 @@ else:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc update(fd: AsyncFD, events: set[Event]) =
let p = getGlobalDispatcher()
assert fd.SocketHandle in p.selector
p.selector.update(fd.SocketHandle, events)
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()
var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
p.selector.register(fd.SocketHandle, {}, data.RootRef)
var data = newAsyncData()
p.selector.registerHandle(fd.SocketHandle, {}, data)
proc closeSocket*(sock: AsyncFD) =
let disp = getGlobalDispatcher()
@@ -969,75 +1104,148 @@ else:
proc unregister*(fd: AsyncFD) =
getGlobalDispatcher().selector.unregister(fd.SocketHandle)
proc unregister*(ev: AsyncEvent) =
getGlobalDispatcher().selector.unregister(SelectEvent(ev))
proc addRead*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
if fd.SocketHandle notin p.selector:
var newEvents = {Event.Read}
withData(p.selector, fd.SocketHandle, adata) do:
adata.readList.add(cb)
newEvents.incl(Event.Read)
if len(adata.writeList) != 0: newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector[fd.SocketHandle].data.PData.readCBs.add(cb)
update(fd, p.selector[fd.SocketHandle].events + {EvRead})
p.selector.updateHandle(fd.SocketHandle, newEvents)
proc addWrite*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
if fd.SocketHandle notin p.selector:
var newEvents = {Event.Write}
withData(p.selector, fd.SocketHandle, adata) do:
adata.writeList.add(cb)
newEvents.incl(Event.Write)
if len(adata.readList) != 0: newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb)
update(fd, p.selector[fd.SocketHandle].events + {EvWrite})
template processCallbacks(callbacks: untyped) =
# Callback may add items to ``callbacks`` which causes issues if
# we are iterating over it at the same time. We therefore
# make a copy to iterate over.
let currentCBs = callbacks
callbacks = @[]
# Using another sequence because callbacks themselves can add
# other callbacks.
var newCBs: seq[Callback] = @[]
for cb in currentCBs:
if newCBs.len > 0:
# A callback has already returned with EAGAIN, don't call any
# others until next `poll`.
newCBs.add(cb)
else:
if not cb(data.fd):
# Callback wants to be called again.
newCBs.add(cb)
callbacks = newCBs & callbacks
p.selector.updateHandle(fd.SocketHandle, newEvents)
proc hasPendingOperations*(): bool =
let p = getGlobalDispatcher()
p.selector.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
template processBasicCallbacks(ident, rwlist: untyped) =
# Process pending descriptor and AsyncEvent callbacks.
#
# Invoke every callback stored in `rwlist`, until one
# returns `false` (which means callback wants to stay
# alive). In such case all remaining callbacks will be added
# to `rwlist` again, in the order they have been inserted.
#
# `rwlist` associated with file descriptor MUST BE emptied before
# dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
# or it can be possible to fall into endless cycle.
var curList: seq[Callback]
withData(p.selector, ident, adata) do:
shallowCopy(curList, adata.rwlist)
adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize)
let newLength = max(len(curList), InitCallbackListSize)
var newList = newSeqOfCap[Callback](newLength)
for cb in curList:
if len(newList) > 0:
# A callback has already returned with EAGAIN, don't call any others
# until next `poll`.
newList.add(cb)
else:
if not cb(fd.AsyncFD):
# Callback wants to be called again.
newList.add(cb)
withData(p.selector, ident, adata) do:
# descriptor still present in queue.
adata.rwlist = newList & adata.rwlist
rLength = len(adata.readList)
wLength = len(adata.writeList)
do:
# descriptor was unregistered in callback via `unregister()`.
rLength = -1
wLength = -1
template processCustomCallbacks(ident: untyped) =
# Process pending custom event callbacks. Custom events are
# {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
# There can be only one callback registered with one descriptor,
# so there is no need to iterate over list.
var curList: seq[Callback]
withData(p.selector, ident, adata) do:
shallowCopy(curList, adata.readList)
adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
let newLength = len(curList)
var newList = newSeqOfCap[Callback](newLength)
var cb = curList[0]
if not cb(fd.AsyncFD):
newList.add(cb)
withData(p.selector, ident, adata) do:
# descriptor still present in queue.
adata.readList = newList & adata.readList
if len(adata.readList) == 0:
# if no callbacks registered with descriptor, unregister it.
p.selector.unregister(fd)
do:
# descriptor was unregistered in callback via `unregister()`.
discard
proc poll*(timeout = 500) =
let p = getGlobalDispatcher()
if p.selector.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
raise newException(ValueError,
"No handles or timers registered in dispatcher.")
if p.selector.len > 0:
for info in p.selector.select(p.adjustedTimeout(timeout)):
let data = PData(info.key.data)
assert data.fd == info.key.fd.AsyncFD
#echo("In poll ", data.fd.cint)
# There may be EvError here, but we handle them in callbacks,
# so that exceptions can be raised from `send(...)` and
# `recv(...)` routines.
if not p.selector.isEmpty():
var keys: array[64, ReadyKey]
var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
for i in 0..<count:
var custom = false
let fd = keys[i].fd
let events = keys[i].events
var rLength = 0 # len(data.readList) after callback
var wLength = 0 # len(data.writeList) after callback
if EvRead in info.events or info.events == {EvError}:
processCallbacks(data.readCBs)
if Event.Read in events or events == {Event.Error}:
processBasicCallbacks(fd, readList)
if EvWrite in info.events or info.events == {EvError}:
processCallbacks(data.writeCBs)
if Event.Write in events or events == {Event.Error}:
processBasicCallbacks(fd, writeList)
if info.key in p.selector:
var newEvents: set[Event]
if data.readCBs.len != 0: newEvents = {EvRead}
if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
if newEvents != info.key.events:
update(data.fd, newEvents)
else:
# FD no longer a part of the selector. Likely been closed
# (e.g. socket disconnected).
discard
if Event.User in events or events == {Event.Error}:
processBasicCallbacks(fd, readList)
custom = true
if rLength == 0:
p.selector.unregister(fd)
when ioselSupportedPlatform:
if (customSet * events) != {}:
custom = true
processCustomCallbacks(fd)
# because state `data` can be modified in callback we need to update
# descriptor events with currently registered callbacks.
if not custom:
var newEvents: set[Event] = {}
if rLength != -1 and wLength != -1:
if rLength > 0: incl(newEvents, Event.Read)
if wLength > 0: incl(newEvents, Event.Write)
p.selector.updateHandle(SocketHandle(fd), newEvents)
# Timer processing.
processTimers(p)
@@ -1075,7 +1283,7 @@ else:
return retFuture
proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
flags = {SocketFlag.SafeDisconn}): Future[int] =
var retFuture = newFuture[int]("recvInto")
proc cb(sock: AsyncFD): bool =
@@ -1216,6 +1424,55 @@ else:
addRead(socket, cb)
return retFuture
when ioselSupportedPlatform:
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
## Start watching for timeout expiration, and then call the
## callback ``cb``.
## ``timeout`` - time in milliseconds,
## ``oneshot`` - if ``true`` only one event will be dispatched,
## if ``false`` continuous events every ``timeout`` milliseconds.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerTimer(timeout, oneshot, data)
proc addSignal*(signal: int, cb: Callback) =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb``.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerSignal(signal, data)
proc addProcess*(pid: int, cb: Callback) =
## Start watching for process exit with pid ``pid``, and then call
## the callback ``cb``.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerProcess(pid, data)
proc newAsyncEvent*(): AsyncEvent =
## Creates new ``AsyncEvent``.
result = AsyncEvent(newSelectEvent())
proc trigger*(ev: AsyncEvent) =
## Sets new ``AsyncEvent`` to signaled state.
trigger(SelectEvent(ev))
proc close*(ev: AsyncEvent) =
## Closes ``AsyncEvent``
close(SelectEvent(ev))
proc addEvent*(ev: AsyncEvent, cb: Callback) =
## Start watching for event ``ev``, and call callback ``cb``, when
## ev will be set to signaled state.
let p = getGlobalDispatcher()
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerEvent(SelectEvent(ev), data)
# Common procedures between current and upcoming asyncdispatch
include includes.asynccommon
@@ -1269,7 +1526,7 @@ proc send*(socket: AsyncFD, data: string,
var copiedData = data
GC_ref(copiedData) # we need to protect data until send operation is completed
# or failed.
# or failed.
let sendFut = socket.send(addr copiedData[0], data.len, flags)
sendFut.callback =
@@ -1317,7 +1574,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
##
## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead.
template addNLIfEmpty(): untyped =
template addNLIfEmpty(): typed =
if result.len == 0:
result.add("\c\L")
@@ -1353,3 +1610,5 @@ proc waitFor*[T](fut: Future[T]): T =
poll()
fut.read
{.deprecated: [setEvent: trigger].}

View File

@@ -1,294 +0,0 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module allows high-level and efficient I/O multiplexing.
##
## Supported OS primitives: ``epoll``, ``kqueue``, ``poll`` and
## Windows ``select``.
##
## To use threadsafe version of this module, it needs to be compiled
## with both ``-d:threadsafe`` and ``--threads:on`` options.
##
## Supported features: files, sockets, pipes, timers, processes, signals
## and user events.
##
## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux (except
## for Android).
##
## Partially supported OS: Windows (only sockets and user events),
## Solaris (files, sockets, handles and user events).
## Android (files, sockets, handles and user events).
##
## TODO: ``/dev/poll``, ``event ports`` and filesystem events.
import os
const hasThreadSupport = compileOption("threads") and defined(threadsafe)
const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly) or
(defined(linux) and not defined(android))
## This constant is used to determine whether the destination platform is
## fully supported by ``ioselectors`` module.
const bsdPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly)
when defined(nimdoc):
type SocketHandle = int
type
Selector*[T] = ref object
## An object which holds descriptors to be checked for read/write status
Event* {.pure.} = enum
## An enum which hold event types
Read, ## Descriptor is available for read
Write, ## Descriptor is available for write
Timer, ## Timer descriptor is completed
Signal, ## Signal is raised
Process, ## Process is finished
Vnode, ## BSD specific file change happens
User, ## User event is raised
Error, ## Error happens while waiting, for descriptor
VnodeWrite, ## NOTE_WRITE (BSD specific, write to file occurred)
VnodeDelete, ## NOTE_DELETE (BSD specific, unlink of file occurred)
VnodeExtend, ## NOTE_EXTEND (BSD specific, file extended)
VnodeAttrib, ## NOTE_ATTRIB (BSD specific, file attributes changed)
VnodeLink, ## NOTE_LINK (BSD specific, file link count changed)
VnodeRename, ## NOTE_RENAME (BSD specific, file renamed)
VnodeRevoke ## NOTE_REVOKE (BSD specific, file revoke occurred)
ReadyKey* = object
## An object which holds result for descriptor
fd* : int ## file/socket descriptor
events*: set[Event] ## set of events
SelectEvent* = object
## An object which holds user defined event
proc newSelector*[T](): Selector[T] =
## Creates a new selector
proc close*[T](s: Selector[T]) =
## Closes selector
proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event],
data: T) =
## Registers file/socket descriptor ``fd`` to selector ``s``
## with events set in ``events``. The ``data`` is application-defined
## data, which to be passed when event happens.
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
## Update file/socket descriptor ``fd``, registered in selector
## ``s`` with new events set ``event``.
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
## Registers timer notification with ``timeout`` in milliseconds
## to selector ``s``.
## If ``oneshot`` is ``true`` timer will be notified only once.
## Set ``oneshot`` to ``false`` if your want periodic notifications.
## The ``data`` is application-defined data, which to be passed, when
## time limit expired.
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
## Registers Unix signal notification with ``signal`` to selector
## ``s``. The ``data`` is application-defined data, which to be
## passed, when signal raises.
##
## This function is not supported for ``Windows``.
proc registerProcess*[T](s: Selector[T], pid: int,
data: T): int {.discardable.} =
## Registers process id (pid) notification when process has
## exited to selector ``s``.
## The ``data`` is application-defined data, which to be passed, when
## process with ``pid`` has exited.
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
## Registers selector event ``ev`` to selector ``s``.
## ``data`` application-defined data, which to be passed, when
## ``ev`` happens.
proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event],
data: T) =
## Registers selector BSD/MacOSX specific vnode events for file
## descriptor ``fd`` and events ``events``.
## ``data`` application-defined data, which to be passed, when
## vnode event happens.
##
## This function is supported only by BSD and MacOSX.
proc newSelectEvent*(): SelectEvent =
## Creates new event ``SelectEvent``.
proc setEvent*(ev: SelectEvent) =
## Trigger event ``ev``.
proc close*(ev: SelectEvent) =
## Closes selector event ``ev``.
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
## Unregisters event ``ev`` from selector ``s``.
proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) =
## Unregisters file/socket descriptor ``fd`` from selector ``s``.
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey]): int =
## Process call waiting for events registered in selector ``s``.
## The ``timeout`` argument specifies the minimum number of milliseconds
## the function will be blocked, if no events are not ready. Specifying a
## timeout of ``-1`` causes function to block indefinitely.
## All available events will be stored in ``results`` array.
##
## Function returns number of triggered events.
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
## Process call waiting for events registered in selector ``s``.
## The ``timeout`` argument specifies the minimum number of milliseconds
## the function will be blocked, if no events are not ready. Specifying a
## timeout of -1 causes function to block indefinitely.
##
## Function returns sequence of triggered events.
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
## Retrieves application-defined ``data`` associated with descriptor ``fd``.
## If specified descriptor ``fd`` is not registered, empty/default value
## will be returned.
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: var T): bool =
## Associate application-defined ``data`` with descriptor ``fd``.
##
## Returns ``true``, if data was succesfully updated, ``false`` otherwise.
template isEmpty*[T](s: Selector[T]): bool =
## Returns ``true``, if there no registered events or descriptors
## in selector.
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
## Retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
## .. code-block:: nim
##
## s.withData(fd, value) do:
## # block is executed only if ``fd`` registered in selector ``s``
## value.uid = 1000
##
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body1, body2: untyped) =
## Retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
## .. code-block:: nim
##
## s.withData(fd, value) do:
## # block is executed only if ``fd`` registered in selector ``s``.
## value.uid = 1000
## do:
## # block is executed if ``fd`` not registered in selector ``s``.
## raise
##
else:
when hasThreadSupport:
import locks
type
SharedArray[T] = UncheckedArray[T]
proc allocSharedArray[T](nsize: int): ptr SharedArray[T] =
result = cast[ptr SharedArray[T]](allocShared0(sizeof(T) * nsize))
proc deallocSharedArray[T](sa: ptr SharedArray[T]) =
deallocShared(cast[pointer](sa))
type
Event* {.pure.} = enum
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot,
Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink,
VnodeRename, VnodeRevoke
type
IOSelectorsException* = object of Exception
ReadyKey* = object
fd* : int
events*: set[Event]
SelectorKey[T] = object
ident: int
events: set[Event]
param: int
data: T
proc raiseIOSelectorsError[T](message: T) =
var msg = ""
when T is string:
msg.add(message)
elif T is OSErrorCode:
msg.add(osErrorMsg(message) & " (code: " & $int(message) & ")")
else:
msg.add("Internal Error\n")
var err = newException(IOSelectorsException, msg)
raise err
when not defined(windows):
import posix
proc setNonBlocking(fd: cint) {.inline.} =
var x = fcntl(fd, F_GETFL, 0)
if x == -1:
raiseIOSelectorsError(osLastError())
else:
var mode = x or O_NONBLOCK
if fcntl(fd, F_SETFL, mode) == -1:
raiseIOSelectorsError(osLastError())
template setKey(s, pident, pevents, pparam, pdata: untyped) =
var skey = addr(s.fds[pident])
skey.ident = pident
skey.events = pevents
skey.param = pparam
skey.data = data
when ioselSupportedPlatform:
template blockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
else:
if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
template unblockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
else:
if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
when defined(linux):
include ioselects/ioselectors_epoll
elif bsdPlatform:
include ioselects/ioselectors_kqueue
elif defined(windows):
include ioselects/ioselectors_select
elif defined(solaris):
include ioselects/ioselectors_poll # need to replace it with event ports
else:
include ioselects/ioselectors_poll

View File

@@ -9,7 +9,7 @@
# This module implements Linux epoll().
import posix, times
import posix, times, epoll
# Maximum number of events that can be returned
const MAX_EPOLL_EVENTS = 64
@@ -36,35 +36,6 @@ when not defined(android):
ssi_addr*: uint64
pad* {.importc: "__pad".}: array[0..47, uint8]
type
eventFdData {.importc: "eventfd_t",
header: "<sys/eventfd.h>", pure, final.} = uint64
epoll_data {.importc: "union epoll_data", header: "<sys/epoll.h>",
pure, final.} = object
u64 {.importc: "u64".}: uint64
epoll_event {.importc: "struct epoll_event",
header: "<sys/epoll.h>", pure, final.} = object
events: uint32 # Epoll events
data: epoll_data # User data variable
const
EPOLL_CTL_ADD = 1 # Add a file descriptor to the interface.
EPOLL_CTL_DEL = 2 # Remove a file descriptor from the interface.
EPOLL_CTL_MOD = 3 # Change file descriptor epoll_event structure.
EPOLLIN = 0x00000001
EPOLLOUT = 0x00000004
EPOLLERR = 0x00000008
EPOLLHUP = 0x00000010
EPOLLRDHUP = 0x00002000
EPOLLONESHOT = 1 shl 30
proc epoll_create(size: cint): cint
{.importc: "epoll_create", header: "<sys/epoll.h>".}
proc epoll_ctl(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint
{.importc: "epoll_ctl", header: "<sys/epoll.h>".}
proc epoll_wait(epfd: cint; events: ptr epoll_event; maxevents: cint;
timeout: cint): cint
{.importc: "epoll_wait", header: "<sys/epoll.h>".}
proc timerfd_create(clock_id: ClockId, flags: cint): cint
{.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
proc timerfd_settime(ufd: cint, flags: cint,
@@ -80,26 +51,26 @@ when not defined(android):
var RLIMIT_NOFILE {.importc: "RLIMIT_NOFILE",
header: "<sys/resource.h>".}: cint
type
rlimit {.importc: "struct rlimit",
RLimit {.importc: "struct rlimit",
header: "<sys/resource.h>", pure, final.} = object
rlim_cur: int
rlim_max: int
proc getrlimit(resource: cint, rlp: var rlimit): cint
proc getrlimit(resource: cint, rlp: var RLimit): cint
{.importc: "getrlimit",header: "<sys/resource.h>".}
when hasThreadSupport:
type
SelectorImpl[T] = object
epollFD : cint
maxFD : int
epollFD: cint
maxFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
epollFD : cint
maxFD : int
epollFD: cint
maxFD: int
fds: seq[SelectorKey[T]]
count: int
Selector*[T] = ref SelectorImpl[T]
@@ -109,7 +80,8 @@ type
SelectEvent* = ptr SelectEventImpl
proc newSelector*[T](): Selector[T] =
var a = rlimit()
# Retrieve the maximum fd count (for current OS) via getrlimit()
var a = RLimit()
if getrlimit(RLIMIT_NOFILE, a) != 0:
raiseOsError(osLastError())
var maxFD = int(a.rlim_max)
@@ -152,8 +124,8 @@ proc newSelectEvent*(): SelectEvent =
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.efd = fdci
proc setEvent*(ev: SelectEvent) =
var data : uint64 = 1
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
raiseIOSelectorsError(osLastError())
@@ -164,6 +136,8 @@ proc close*(ev: SelectEvent) =
raiseIOSelectorsError(osLastError())
template checkFd(s, f) =
# TODO: I don't see how this can ever happen. You won't be able to create an
# FD if there is too many. -- DP
if f >= s.maxFD:
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
@@ -171,10 +145,10 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == 0, "Descriptor $# already registered" % $fdi)
s.setKey(fdi, events, 0, data)
if events != {}:
var epv = epoll_event(events: EPOLLRDHUP)
var epv = EpollEvent(events: EPOLLRDHUP)
epv.data.u64 = fdi.uint
if Event.Read in events: epv.events = epv.events or EPOLLIN
if Event.Write in events: epv.events = epv.events or EPOLLOUT
@@ -189,10 +163,10 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
"Descriptor $# is not registered in the selector!" % $fdi)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
var epv = epoll_event(events: EPOLLRDHUP)
var epv = EpollEvent(events: EPOLLRDHUP)
epv.data.u64 = fdi.uint
if Event.Read in events: epv.events = epv.events or EPOLLIN
@@ -217,24 +191,25 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
"Descriptor $# is not registered in the selector!" % $fdi)
if pkey.events != {}:
when not defined(android):
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
var epv = EpollEvent()
# TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc.
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
if Event.Finished notin pkey.events:
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
dec(s.count)
if posix.close(cint(fdi)) != 0:
raiseIOSelectorsError(osLastError())
elif Event.Signal in pkey.events:
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
var nmask, omask: Sigset
@@ -247,7 +222,7 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
raiseIOSelectorsError(osLastError())
elif Event.Process in pkey.events:
if Event.Finished notin pkey.events:
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
var nmask, omask: Sigset
@@ -260,13 +235,13 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
raiseIOSelectorsError(osLastError())
else:
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
if Event.Finished notin pkey.events:
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
dec(s.count)
@@ -280,7 +255,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
dec(s.count)
@@ -300,7 +275,7 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
doAssert(s.fds[fdi].ident == 0)
var events = {Event.Timer}
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if oneshot:
new_ts.it_interval.tv_sec = 0.Time
@@ -343,7 +318,7 @@ when not defined(android):
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
raiseIOSelectorsError(osLastError())
@@ -370,7 +345,7 @@ when not defined(android):
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
epv.events = EPOLLIN or EPOLLRDHUP
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
@@ -383,7 +358,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = int(ev.efd)
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
s.setKey(fdi, {Event.User}, 0, data)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = ev.efd.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0:
raiseIOSelectorsError(osLastError())
@@ -392,7 +367,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey]): int =
var
resTable: array[MAX_EPOLL_EVENTS, epoll_event]
resTable: array[MAX_EPOLL_EVENTS, EpollEvent]
maxres = MAX_EPOLL_EVENTS
i, k: int
@@ -482,7 +457,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
rkey.events.incl(Event.User)
if Event.Oneshot in pkey.events:
var epv = epoll_event()
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0:
raiseIOSelectorsError(osLastError())
# we will not clear key until it will be unregistered, so
@@ -505,16 +480,19 @@ proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd].ident != 0
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
if fdi in s:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
if fdi in s:
s.fds[fdi].data = data
result = true
@@ -523,8 +501,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].data)
if fdi in s:
var value = addr(s.getData(fdi))
body
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
@@ -532,8 +510,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].data)
if fdi in s:
var value = addr(s.getData(fdi))
body1
else:
body2

View File

@@ -144,7 +144,7 @@ proc newSelectEvent*(): SelectEvent =
result.rfd = fds[0]
result.wfd = fds[1]
proc setEvent*(ev: SelectEvent) =
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
@@ -243,7 +243,7 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
"Descriptor $# is not registered in the queue!" % $fdi)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
@@ -584,16 +584,19 @@ proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd].ident != 0
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
if fdi in s:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
if fdi in s:
s.fds[fdi].data = data
result = true
@@ -602,8 +605,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].data)
if fdi in s:
var value = addr(s.getData(fdi))
body
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
@@ -611,8 +614,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].data)
if fdi in s:
var value = addr(s.getData(fdi))
body1
else:
body2

View File

@@ -208,7 +208,7 @@ proc newSelectEvent*(): SelectEvent =
result.rfd = fds[0]
result.wfd = fds[1]
proc setEvent*(ev: SelectEvent) =
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
@@ -279,16 +279,19 @@ proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd].ident != 0
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
if fdi in s:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
if fdi in s:
s.fds[fdi].data = data
result = true
@@ -297,8 +300,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].data)
if fdi in s:
var value = addr(s.getData(fdi))
body
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
@@ -306,8 +309,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].data)
if fdi in s:
var value = addr(s.getData(fdi))
body1
else:
body2

View File

@@ -154,7 +154,7 @@ when defined(windows):
result.rsock = rsock
result.wsock = wsock
proc setEvent*(ev: SelectEvent) =
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if winlean.send(ev.wsock, cast[pointer](addr data),
cint(sizeof(uint64)), 0) != sizeof(uint64):
@@ -178,7 +178,7 @@ else:
result.rsock = SocketHandle(fds[0])
result.wsock = SocketHandle(fds[1])
proc setEvent*(ev: SelectEvent) =
proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
@@ -379,6 +379,16 @@ proc flush*[T](s: Selector[T]) = discard
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
s.withSelectLock():
result = false
let fdi = int(fd)
for i in 0..<FD_SETSIZE:
if s.fds[i].ident == fdi:
return true
inc(i)
when hasThreadSupport:
template withSelectLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
@@ -391,15 +401,12 @@ else:
template withSelectLock[T](s: Selector[T], body: untyped) =
body
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
s.withSelectLock():
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
for i in 0..<FD_SETSIZE:
if s.fds[i].ident == fdi:
result = s.fds[i].data
break
inc(i)
return s.fds[i].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
s.withSelectLock():

View File

@@ -1,420 +1,311 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2015 Dominik Picheta
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# TODO: Docs.
## This module allows high-level and efficient I/O multiplexing.
##
## Supported OS primitives: ``epoll``, ``kqueue``, ``poll`` and
## Windows ``select``.
##
## To use threadsafe version of this module, it needs to be compiled
## with both ``-d:threadsafe`` and ``--threads:on`` options.
##
## Supported features: files, sockets, pipes, timers, processes, signals
## and user events.
##
## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux (except
## for Android).
##
## Partially supported OS: Windows (only sockets and user events),
## Solaris (files, sockets, handles and user events).
## Android (files, sockets, handles and user events).
##
## TODO: ``/dev/poll``, ``event ports`` and filesystem events.
import os, hashes
import os, strutils, nativesockets
when defined(linux):
import posix, epoll
elif defined(macosx) or defined(freebsd) or defined(openbsd) or defined(netbsd):
import posix, kqueue, times
elif defined(windows):
import winlean
else:
import posix
const hasThreadSupport = compileOption("threads") and defined(threadsafe)
const MultiThreaded = defined(useStdlibThreading)
const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly) or
(defined(linux) and not defined(android))
## This constant is used to determine whether the destination platform is
## fully supported by ``ioselectors`` module.
when MultiThreaded:
import sharedtables
type SelectorData = pointer
else:
import tables
type SelectorData = RootRef
proc hash*(x: SocketHandle): Hash {.borrow.}
proc `$`*(x: SocketHandle): string {.borrow.}
type
Event* = enum
EvRead, EvWrite, EvError
SelectorKey* = object
fd*: SocketHandle
events*: set[Event] ## The events which ``fd`` listens for.
data*: SelectorData ## User object.
ReadyInfo* = tuple[key: SelectorKey, events: set[Event]]
const bsdPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(dragonfly)
when defined(nimdoc):
type
Selector* = ref object
## An object which holds file descriptors to be checked for read/write
## status.
Selector*[T] = ref object
## An object which holds descriptors to be checked for read/write status
proc register*(s: Selector, fd: SocketHandle, events: set[Event],
data: SelectorData): SelectorKey {.discardable.} =
## Registers file descriptor ``fd`` to selector ``s`` with a set of Event
## ``events``.
Event* {.pure.} = enum
## An enum which hold event types
Read, ## Descriptor is available for read
Write, ## Descriptor is available for write
Timer, ## Timer descriptor is completed
Signal, ## Signal is raised
Process, ## Process is finished
Vnode, ## BSD specific file change happens
User, ## User event is raised
Error, ## Error happens while waiting, for descriptor
VnodeWrite, ## NOTE_WRITE (BSD specific, write to file occurred)
VnodeDelete, ## NOTE_DELETE (BSD specific, unlink of file occurred)
VnodeExtend, ## NOTE_EXTEND (BSD specific, file extended)
VnodeAttrib, ## NOTE_ATTRIB (BSD specific, file attributes changed)
VnodeLink, ## NOTE_LINK (BSD specific, file link count changed)
VnodeRename, ## NOTE_RENAME (BSD specific, file renamed)
VnodeRevoke ## NOTE_REVOKE (BSD specific, file revoke occurred)
proc update*(s: Selector, fd: SocketHandle,
events: set[Event]): SelectorKey {.discardable.} =
## Updates the events which ``fd`` wants notifications for.
ReadyKey* = object
## An object which holds result for descriptor
fd* : int ## file/socket descriptor
events*: set[Event] ## set of events
proc unregister*(s: Selector, fd: SocketHandle): SelectorKey {.discardable.} =
## Unregisters file descriptor ``fd`` from selector ``s``.
SelectEvent* = object
## An object which holds user defined event
proc close*(s: Selector) =
## Closes the selector
proc select*(s: Selector, timeout: int): seq[ReadyInfo] =
## The ``events`` field of the returned ``key`` contains the original events
## for which the ``fd`` was bound. This is contrary to the ``events`` field
## of the ``ReadyInfo`` tuple which determines which events are ready
## on the ``fd``.
proc newSelector*(): Selector =
proc newSelector*[T](): Selector[T] =
## Creates a new selector
proc contains*(s: Selector, fd: SocketHandle): bool =
proc close*[T](s: Selector[T]) =
## Closes the selector.
proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event],
data: T) =
## Registers file/socket descriptor ``fd`` to selector ``s``
## with events set in ``events``. The ``data`` is application-defined
## data, which will be passed when an event is triggered.
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
## Update file/socket descriptor ``fd``, registered in selector
## ``s`` with new events set ``event``.
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
## Registers timer notification with ``timeout`` (in milliseconds)
## to selector ``s``.
##
## If ``oneshot`` is ``true``, timer will be notified only once.
##
## Set ``oneshot`` to ``false`` if you want periodic notifications.
##
## The ``data`` is application-defined data, which will be passed, when
## the timer is triggered.
##
## Returns the file descriptor for the registered timer.
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
## Registers Unix signal notification with ``signal`` to selector
## ``s``.
##
## The ``data`` is application-defined data, which will be
## passed when signal raises.
##
## Returns the file descriptor for the registered signal.
##
## **Note:** This function is not supported on ``Windows``.
proc registerProcess*[T](s: Selector[T], pid: int,
data: T): int {.discardable.} =
## Registers a process id (pid) notification (when process has
## exited) in selector ``s``.
##
## The ``data`` is application-defined data, which will be passed when
## process with ``pid`` has exited.
##
## Returns the file descriptor for the registered signal.
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
## Registers selector event ``ev`` in selector ``s``.
##
## The ``data`` is application-defined data, which will be passed when
## ``ev`` happens.
proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event],
data: T) =
## Registers selector BSD/MacOSX specific vnode events for file
## descriptor ``fd`` and events ``events``.
## ``data`` application-defined data, which to be passed, when
## vnode event happens.
##
## **Note:** This function is supported only by BSD and MacOSX.
proc newSelectEvent*(): SelectEvent =
## Creates a new user-defined event.
proc trigger*(ev: SelectEvent) =
## Trigger event ``ev``.
proc close*(ev: SelectEvent) =
## Closes user-defined event ``ev``.
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
## Unregisters user-defined event ``ev`` from selector ``s``.
proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) =
## Unregisters file/socket descriptor ``fd`` from selector ``s``.
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey]): int =
## Waits for events registered in selector ``s``.
##
## The ``timeout`` argument specifies the maximum number of milliseconds
## the function will be blocked for if no events are ready. Specifying a
## timeout of ``-1`` causes the function to block indefinitely.
## All available events will be stored in ``results`` array.
##
## Returns number of triggered events.
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
## Waits for events registered in selector ``s``.
##
## The ``timeout`` argument specifies the maximum number of milliseconds
## the function will be blocked for if no events are ready. Specifying a
## timeout of ``-1`` causes the function to block indefinitely.
##
## Returns a list of triggered events.
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
## Retrieves application-defined ``data`` associated with descriptor ``fd``.
## If specified descriptor ``fd`` is not registered, empty/default value
## will be returned.
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: var T): bool =
## Associate application-defined ``data`` with descriptor ``fd``.
##
## Returns ``true``, if data was succesfully updated, ``false`` otherwise.
template isEmpty*[T](s: Selector[T]): bool = # TODO: Why is this a template?
## Returns ``true``, if there are no registered events or descriptors
## in selector.
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
## Retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
## .. code-block:: nim
##
## s.withData(fd, value) do:
## # block is executed only if ``fd`` registered in selector ``s``
## value.uid = 1000
##
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body1, body2: untyped) =
## Retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
## .. code-block:: nim
##
## s.withData(fd, value) do:
## # block is executed only if ``fd`` registered in selector ``s``.
## value.uid = 1000
## do:
## # block is executed if ``fd`` not registered in selector ``s``.
## raise
##
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
## Determines whether selector contains a file descriptor.
proc `[]`*(s: Selector, fd: SocketHandle): SelectorKey =
## Retrieves the selector key for ``fd``.
elif defined(linux):
type
Selector* = object
epollFD: cint
events: array[64, epoll_event]
when MultiThreaded:
fds: SharedTable[SocketHandle, SelectorKey]
else:
fds: Table[SocketHandle, SelectorKey]
proc createEventStruct(events: set[Event], fd: SocketHandle): epoll_event =
if EvRead in events:
result.events = EPOLLIN
if EvWrite in events:
result.events = result.events or EPOLLOUT
result.events = result.events or EPOLLRDHUP
result.data.fd = fd.cint
proc register*(s: var Selector, fd: SocketHandle, events: set[Event],
data: SelectorData) =
var event = createEventStruct(events, fd)
if events != {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
raiseOSError(osLastError())
s.fds[fd] = SelectorKey(fd: fd, events: events, data: data)
proc update*(s: var Selector, fd: SocketHandle, events: set[Event]) =
if s.fds[fd].events != events:
if events == {}:
# This fd is idle -- it should not be registered to epoll.
# But it should remain a part of this selector instance.
# This is to prevent epoll_wait from returning immediately
# because its got fds which are waiting for no events and
# are therefore constantly ready. (leading to 100% CPU usage).
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
raiseOSError(osLastError())
s.fds[fd].events = events
else:
var event = createEventStruct(events, fd)
if s.fds[fd].events == {}:
# This fd is idle. It's not a member of this epoll instance and must
# be re-registered.
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
raiseOSError(osLastError())
else:
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
raiseOSError(osLastError())
s.fds[fd].events = events
proc unregister*(s: var Selector, fd: SocketHandle) =
if s.fds[fd].events != {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
let err = osLastError()
if err.cint notin {ENOENT, EBADF}:
# TODO: Why do we sometimes get an EBADF? Is this normal?
raiseOSError(err)
s.fds.del(fd)
proc close*(s: var Selector) =
when MultiThreaded: deinitSharedTable(s.fds)
if s.epollFD.close() != 0: raiseOSError(osLastError())
proc epollHasFd(s: Selector, fd: SocketHandle): bool =
result = true
var event = createEventStruct(s.fds[fd].events, fd)
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
let err = osLastError()
if err.cint in {ENOENT, EBADF}:
return false
raiseOSError(err)
proc select*(s: var Selector, timeout: int): seq[ReadyInfo] =
result = @[]
let evNum = epoll_wait(s.epollFD, addr s.events[0], 64.cint, timeout.cint)
if evNum < 0:
let err = osLastError()
if err.cint == EINTR:
return @[]
raiseOSError(err)
if evNum == 0: return @[]
for i in 0 ..< evNum:
let fd = s.events[i].data.fd.SocketHandle
var evSet: set[Event] = {}
if (s.events[i].events and EPOLLERR) != 0 or (s.events[i].events and EPOLLHUP) != 0: evSet = evSet + {EvError}
if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
if (s.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
let selectorKey = s.fds[fd]
assert selectorKey.fd != 0.SocketHandle
result.add((selectorKey, evSet))
#echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events)
proc newSelector*(): Selector =
result.epollFD = epoll_create(64)
if result.epollFD < 0:
raiseOSError(osLastError())
when MultiThreaded:
result.fds = initSharedTable[SocketHandle, SelectorKey]()
else:
result.fds = initTable[SocketHandle, SelectorKey]()
proc contains*(s: Selector, fd: SocketHandle): bool =
## Determines whether selector contains a file descriptor.
if s.fds.hasKey(fd):
# Ensure the underlying epoll instance still contains this fd.
if s.fds[fd].events != {}:
result = epollHasFd(s, fd)
else:
result = true
else:
return false
proc `[]`*(s: Selector, fd: SocketHandle): SelectorKey =
## Retrieves the selector key for ``fd``.
return s.fds[fd]
elif defined(macosx) or defined(freebsd) or defined(openbsd) or defined(netbsd):
type
Selector* = object
kqFD: cint
events: array[64, KEvent]
when MultiThreaded:
fds: SharedTable[SocketHandle, SelectorKey]
else:
fds: Table[SocketHandle, SelectorKey]
template modifyKQueue(kqFD: cint, fd: SocketHandle, event: Event,
op: cushort) =
var kev = KEvent(ident: fd.cuint,
filter: if event == EvRead: EVFILT_READ else: EVFILT_WRITE,
flags: op)
if kevent(kqFD, addr kev, 1, nil, 0, nil) == -1:
raiseOSError(osLastError())
proc register*(s: var Selector, fd: SocketHandle, events: set[Event],
data: SelectorData) =
for event in events:
modifyKQueue(s.kqFD, fd, event, EV_ADD)
s.fds[fd] = SelectorKey(fd: fd, events: events, data: data)
proc update*(s: var Selector, fd: SocketHandle, events: set[Event]) =
let previousEvents = s.fds[fd].events
if previousEvents != events:
for event in events-previousEvents:
modifyKQueue(s.kqFD, fd, event, EV_ADD)
for event in previousEvents-events:
modifyKQueue(s.kqFD, fd, event, EV_DELETE)
s.fds[fd].events = events
proc unregister*(s: var Selector, fd: SocketHandle) =
for event in s.fds[fd].events:
modifyKQueue(s.kqFD, fd, event, EV_DELETE)
s.fds.del(fd)
proc close*(s: var Selector) =
when MultiThreaded: deinitSharedTable(s.fds)
if s.kqFD.close() != 0: raiseOSError(osLastError())
proc select*(s: var Selector, timeout: int): seq[ReadyInfo] =
result = @[]
var tv =
if timeout >= 1000: Timespec(tv_sec: (timeout div 1000).Time, tv_nsec: 0)
else: Timespec(tv_sec: 0.Time, tv_nsec: timeout * 1000000)
let evNum = kevent(s.kqFD, nil, 0, addr s.events[0], 64.cint, addr tv)
if evNum < 0:
let err = osLastError()
if err.cint == EINTR:
return @[]
raiseOSError(err)
if evNum == 0: return @[]
for i in 0 ..< evNum:
let fd = s.events[i].ident.SocketHandle
var evSet: set[Event] = {}
if (s.events[i].flags and EV_EOF) != 0: evSet = evSet + {EvError}
if s.events[i].filter == EVFILT_READ: evSet = evSet + {EvRead}
elif s.events[i].filter == EVFILT_WRITE: evSet = evSet + {EvWrite}
let selectorKey = s.fds[fd]
assert selectorKey.fd != 0.SocketHandle
result.add((selectorKey, evSet))
proc newSelector*(): Selector =
result.kqFD = kqueue()
if result.kqFD < 0:
raiseOSError(osLastError())
when MultiThreaded:
result.fds = initSharedTable[SocketHandle, SelectorKey]()
else:
result.fds = initTable[SocketHandle, SelectorKey]()
proc contains*(s: Selector, fd: SocketHandle): bool =
## Determines whether selector contains a file descriptor.
s.fds.hasKey(fd) # and s.fds[fd].events != {}
proc `[]`*(s: Selector, fd: SocketHandle): SelectorKey =
## Retrieves the selector key for ``fd``.
return s.fds[fd]
elif not defined(nimdoc):
# TODO: kqueue for bsd/mac os x.
type
Selector* = object
when MultiThreaded:
fds: SharedTable[SocketHandle, SelectorKey]
else:
fds: Table[SocketHandle, SelectorKey]
proc register*(s: var Selector, fd: SocketHandle, events: set[Event],
data: SelectorData) =
let result = SelectorKey(fd: fd, events: events, data: data)
if s.fds.hasKeyOrPut(fd, result):
raise newException(ValueError, "File descriptor already exists.")
proc update*(s: var Selector, fd: SocketHandle, events: set[Event]) =
#if not s.fds.hasKey(fd):
# raise newException(ValueError, "File descriptor not found.")
s.fds[fd].events = events
proc unregister*(s: var Selector, fd: SocketHandle) =
s.fds.del(fd)
proc close*(s: var Selector) =
when MultiThreaded: deinitSharedTable(s.fds)
proc timeValFromMilliseconds(timeout: int): TimeVal =
if timeout != -1:
var seconds = timeout div 1000
result.tv_sec = seconds.int32
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
proc createFdSet(rd, wr: var TFdSet, s: Selector, m: var int) =
FD_ZERO(rd); FD_ZERO(wr)
for k, v in pairs(s.fds):
if EvRead in v.events:
m = max(m, int(k))
FD_SET(k, rd)
if EvWrite in v.events:
m = max(m, int(k))
FD_SET(k, wr)
proc getReadyFDs(rd, wr: var TFdSet,
s: var Selector): seq[ReadyInfo] =
result = @[]
for k, v in pairs(s.fds):
var events: set[Event] = {}
if FD_ISSET(k, rd) != 0'i32:
events = events + {EvRead}
if FD_ISSET(k, wr) != 0'i32:
events = events + {EvWrite}
result.add((v, events))
proc select*(s: var Selector, timeout: int): seq[ReadyInfo] =
var tv {.noInit.}: TimeVal = timeValFromMilliseconds(timeout)
var rd, wr: TFdSet
var m = 0
createFdSet(rd, wr, s, m)
var retCode = 0
if timeout != -1:
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, addr(tv)))
else:
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, nil))
if retCode < 0:
raiseOSError(osLastError())
elif retCode == 0:
return @[]
else:
return getReadyFDs(rd, wr, s)
proc newSelector*(): Selector =
when MultiThreaded:
result.fds = initSharedTable[SocketHandle, SelectorKey]()
else:
result.fds = initTable[SocketHandle, SelectorKey]()
proc contains*(s: Selector, fd: SocketHandle): bool =
return s.fds.hasKey(fd)
proc `[]`*(s: Selector, fd: SocketHandle): SelectorKey =
return s.fds[fd]
proc contains*(s: Selector, key: SelectorKey): bool =
## Determines whether selector contains this selector key. More accurate
## than checking if the file descriptor is in the selector because it
## ensures that the keys are equal. File descriptors may not always be
## unique especially when an fd is closed and then a new one is opened,
## the new one may have the same value.
when not defined(nimdoc):
return key.fd in s and s.fds[key.fd] == key
proc len*(s: Selector): int =
## Retrieves the number of registered file descriptors in this Selector.
when not defined(nimdoc):
return s.fds.len
{.deprecated: [TEvent: Event, PSelectorKey: SelectorKey,
TReadyInfo: ReadyInfo, PSelector: Selector].}
else:
when hasThreadSupport:
import locks
when not defined(testing) and isMainModule and not defined(nimdoc):
# Select()
import sockets
when MultiThreaded:
type
SockWrapper = object
sock: Socket
else:
type
SockWrapper = ref object of RootObj
sock: Socket
SharedArray[T] = UncheckedArray[T]
var sock = socket()
if sock == sockets.invalidSocket: raiseOSError(osLastError())
#sock.setBlocking(false)
sock.connect("irc.freenode.net", Port(6667))
proc allocSharedArray[T](nsize: int): ptr SharedArray[T] =
result = cast[ptr SharedArray[T]](allocShared0(sizeof(T) * nsize))
var selector = newSelector()
var data = SockWrapper(sock: sock)
when MultiThreaded:
selector.register(sock.getFD, {EvWrite}, addr data)
proc deallocSharedArray[T](sa: ptr SharedArray[T]) =
deallocShared(cast[pointer](sa))
type
Event* {.pure.} = enum
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot,
Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink,
VnodeRename, VnodeRevoke
type
IOSelectorsException* = object of Exception
ReadyKey* = object
fd* : int
events*: set[Event]
SelectorKey[T] = object
ident: int
events: set[Event]
param: int
data: T
proc raiseIOSelectorsError[T](message: T) =
var msg = ""
when T is string:
msg.add(message)
elif T is OSErrorCode:
msg.add(osErrorMsg(message) & " (code: " & $int(message) & ")")
else:
msg.add("Internal Error\n")
var err = newException(IOSelectorsException, msg)
raise err
proc setNonBlocking(fd: cint) {.inline.} =
setBlocking(fd.SocketHandle, false)
when not defined(windows):
import posix
template setKey(s, pident, pevents, pparam, pdata: untyped) =
var skey = addr(s.fds[pident])
skey.ident = pident
skey.events = pevents
skey.param = pparam
skey.data = data
when ioselSupportedPlatform:
template blockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
else:
if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
template unblockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
else:
if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
when defined(linux):
include ioselects/ioselectors_epoll
elif bsdPlatform:
include ioselects/ioselectors_kqueue
elif defined(windows):
include ioselects/ioselectors_select
elif defined(solaris):
include ioselects/ioselectors_poll # need to replace it with event ports
else:
selector.register(sock.getFD, {EvWrite}, data)
var i = 0
while true:
let ready = selector.select(1000)
echo ready.len
if ready.len > 0: echo ready[0].events
i.inc
if i == 6:
selector.unregister(sock.getFD)
selector.close()
break
include ioselects/ioselectors_poll
{.deprecated: [setEvent: trigger].}
{.deprecated: [register: registerHandle].}
{.deprecated: [update: updateHandle].}

View File

@@ -2,7 +2,7 @@ discard """
file: "tioselectors.nim"
output: "All tests passed!"
"""
import ioselectors
import selectors
const hasThreadSupport = compileOption("threads")

View File

@@ -205,7 +205,6 @@ proc ioTests(r: var TResults, cat: Category, options: string) =
proc asyncTests(r: var TResults, cat: Category, options: string) =
template test(filename: untyped) =
testSpec r, makeTest(filename, options, cat)
testSpec r, makeTest(filename, options & " -d:upcoming", cat)
for t in os.walkFiles("tests/async/t*.nim"):
test(t)