From c857e32867f14265f11df585c9b50f01c43e2f39 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 23 Jun 2016 23:12:27 +0300 Subject: [PATCH 1/5] Small version of ioselectors.nim, without code duplication and some renames. --- lib/pure/ioselectors.nim | 1849 ++++++++++++++++++++++++++++++ tests/async/tioselectors.nim | 407 +++++++ tests/async/tioselectors.nim.cfg | 1 + 3 files changed, 2257 insertions(+) create mode 100644 lib/pure/ioselectors.nim create mode 100644 tests/async/tioselectors.nim create mode 100644 tests/async/tioselectors.nim.cfg diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim new file mode 100644 index 0000000000..b77ab9295a --- /dev/null +++ b/lib/pure/ioselectors.nim @@ -0,0 +1,1849 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module allows high-level and efficient I/O multiplexing. +## +## Supported OS primitives: ``epoll``, ``kqueue``, ``poll`` and +## Windows ``select``. +## +## To use threadsafe version of this module, it needs to be compiled +## with both ``-d:threadsafe`` and ``--threads:on`` options. +## +## Supported features: files, sockets, pipes, timers, processes, signals +## and user events. +## +## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux. +## +## Partially supported OS: Windows (only sockets and user events), +## Solaris (files, sockets, handles and user events). +## +## TODO: ``/dev/poll``, ``event ports`` and filesystem events. + +import os + +const hasThreadSupport = compileOption("threads") and defined(threadsafe) + +const supportedPlatform = defined(macosx) or defined(freebsd) or + defined(netbsd) or defined(openbsd) or + defined(linux) + +const bsdPlatform = defined(macosx) or defined(freebsd) or + defined(netbsd) or defined(openbsd) + +when defined(linux): + import posix, times +elif bsdPlatform: + import posix, kqueue, times +elif defined(windows): + import winlean +else: + import posix + +when defined(nimdoc): + type + Selector*[T] = ref object + ## An object which holds descriptors to be checked for read/write status + + Event* = enum + ## An enum which hold event types + eventRead, ## Descriptor is available for read + eventWrite, ## Descriptor is available for write + eventTimer, ## Timer descriptor is completed + eventSignal, ## Signal is raised + eventProcess, ## Process is finished + eventVnode, ## Currently not supported + eventUser, ## User event is raised + eventError ## Error happens while waiting, for descriptor + + ReadyKey*[T] = object + ## An object which holds result for descriptor + fd* : int ## file/socket descriptor + events*: set[Event] ## set of events + data*: T ## application-defined data + + SelectEvent* = object + ## An object which holds user defined event + + proc newSelector*[T](): Selector[T] = + ## Creates a new selector + + proc close*[T](s: Selector[T]) = + ## Closes selector + + proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], + data: T) = + ## Registers file/socket descriptor ``fd`` to selector ``s`` + ## with events set in ``events``. The ``data`` is application-defined + ## data, which to be passed when event happens. + + proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = + ## Update file/socket descriptor ``fd``, registered in selector + ## ``s`` with new events set ``event``. + + proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): int {.discardable.} = + ## Registers timer notification with ``timeout`` in milliseconds + ## to selector ``s``. + ## If ``oneshot`` is ``true`` timer will be notified only once. + ## Set ``oneshot`` to ``false`` if your want periodic notifications. + ## The ``data`` is application-defined data, which to be passed, when + ## time limit expired. + + proc registerSignal*[T](s: Selector[T], signal: int, + data: T): int {.discardable.} = + ## Registers Unix signal notification with ``signal`` to selector + ## ``s``. The ``data`` is application-defined data, which to be + ## passed, when signal raises. + ## + ## This function is not supported for ``Windows``. + + proc registerProcess*[T](s: Selector[T], pid: int, + data: T): int {.discardable.} = + ## Registers process id (pid) notification when process has + ## exited to selector ``s``. + ## The ``data`` is application-defined data, which to be passed, when + ## process with ``pid`` has exited. + + proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + ## Registers selector event ``ev`` to selector ``s``. + ## ``data`` application-defined data, which to be passed, when + ## ``ev`` happens. + + proc newEvent*(): SelectEvent = + ## Creates new event ``SelectEvent``. + + proc setEvent*(ev: SelectEvent) = + ## Trigger event ``ev``. + + proc close*(ev: SelectEvent) = + ## Closes selector event ``ev``. + + proc unregister*[T](s: Selector[T], ev: SelectEvent) = + ## Unregisters event ``ev`` from selector ``s``. + + proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = + ## Unregisters file/socket descriptor ``fd`` from selector ``s``. + + proc flush*[T](s: Selector[T]) = + ## Flushes all changes was made to kernel pool/queue. + ## This function is usefull only for BSD and MacOS, because + ## kqueue supports bulk changes to be made. + ## On Linux/Windows and other Posix compatible operation systems, + ## ``flush`` is alias for `discard`. + + proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey[T]]): int = + ## Process call waiting for events registered in selector ``s``. + ## The ``timeout`` argument specifies the minimum number of milliseconds + ## the function will be blocked, if no events are not ready. Specifying a + ## timeout of ``-1`` causes function to block indefinitely. + ## All available events will be stored in ``results`` array. + ## + ## Function returns number of triggered events. + + proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + ## Process call waiting for events registered in selector ``s``. + ## The ``timeout`` argument specifies the minimum number of milliseconds + ## the function will be blocked, if no events are not ready. Specifying a + ## timeout of -1 causes function to block indefinitely. + ## + ## Function returns sequence of triggered events. + + template isEmpty*[T](s: Selector[T]): bool = + ## Returns ``true``, if there no registered events or descriptors + ## in selector. + + template withData*[T](s: Selector[T], fd: SocketHandle, value, + body: untyped) = + ## retrieves the application-data assigned with descriptor ``fd`` + ## to ``value``. This ``value`` can be modified in the scope of + ## the ``withData`` call. + ## + ## .. code-block:: nim + ## + ## s.withData(fd, value) do: + ## # block is executed only if ``fd`` registered in selector ``s`` + ## value.uid = 1000 + ## + + template withData*[T](s: Selector[T], fd: SocketHandle, value, + body1, body2: untyped) = + ## retrieves the application-data assigned with descriptor ``fd`` + ## to ``value``. This ``value`` can be modified in the scope of + ## the ``withData`` call. + ## + ## .. code-block:: nim + ## + ## s.withData(fd, value) do: + ## # block is executed only if ``fd`` registered in selector ``s``. + ## value.uid = 1000 + ## do: + ## # block is executed if ``fd`` not registered in selector ``s``. + ## raise + ## + +else: + when not defined(windows): + when defined(macosx): + var + OPEN_MAX {.importc: "OPEN_MAX", header: "".}: cint + var + RLIMIT_NOFILE {.importc: "RLIMIT_NOFILE", + header: "".}: cint + type + rlimit {.importc: "struct rlimit", + header: "", pure, final.} = object + rlim_cur: int + rlim_max: int + proc getrlimit(resource: cint, rlp: var rlimit): cint {. + importc: "getrlimit",header: ""} + proc getMaxFds*(): int = + var a = rlimit() + if getrlimit(RLIMIT_NOFILE, a) != 0: + raiseOsError(osLastError()) + result = a.rlim_max + when defined(macosx): + if a.rlim_max > OPEN_MAX: + result = OPEN_MAX + + when hasThreadSupport: + import locks + + type + Event* = enum + eventRead, eventWrite, eventTimer, eventSignal, eventProcess, + eventVnode, eventUser, eventError, + flagHandle, flagTimer, flagSignal, flagProcess, flagVnode, flagUser, + flagOneshot + + ReadyKey*[T] = object + fd* : int + events*: set[Event] + data*: T + + SelectorKey[T] = object + ident : int + flags : set[Event] + param : int + key : ReadyKey[T] + + when not defined(windows): + when hasThreadSupport: + type + SharedArrayHolder[T] = object + part: array[16, T] + SharedArray {.unchecked.}[T] = array[0..100_000_000, T] + + proc allocSharedArray[T](nsize: int): ptr SharedArray[T] = + let holder = cast[ptr SharedArrayHolder[T]]( + allocShared0(sizeof(T) * nsize) + ) + result = cast[ptr SharedArray[T]](addr(holder.part[0])) + + proc deallocSharedArray[T](sa: ptr SharedArray[T]) = + deallocShared(cast[pointer](sa)) + + template setNonBlocking(fd) = + var x: int = fcntl(fd, F_GETFL, 0) + if x == -1: raiseOSError(osLastError()) + else: + var mode = x or O_NONBLOCK + if fcntl(fd, F_SETFL, mode) == -1: + raiseOSError(osLastError()) + + template setKey(s, f1, f2, e, p, d) = + s.fds[f1].ident = f1 + s.fds[f1].flags = e + s.fds[f1].param = p + s.fds[f1].key.fd = f2 + s.fds[f1].key.data = d + + template clearKey(s, f) = + s.fds[f].ident = 0 + s.fds[f].flags = {} + + when supportedPlatform: + template blockSignals(newmask: var Sigset, oldmask: var Sigset) = + when hasThreadSupport: + if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1: + raiseOSError(osLastError()) + else: + if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1: + raiseOSError(osLastError()) + + template unblockSignals(newmask: var Sigset, oldmask: var Sigset) = + when hasThreadSupport: + if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1: + raiseOSError(osLastError()) + else: + if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1: + raiseOSError(osLastError()) + # + # BSD kqueue + # + # I have tried to adopt kqueue's EVFILT_USER filter for user-events, but it + # looks not very usable, because of 2 cases: + # 1) EVFILT_USER does not supported by OpenBSD and NetBSD + # 2) You can't have one event, which you can use with many kqueue handles. + # So decision was made in favor of the pipes + # + when bsdPlatform: + const + MAX_KQUEUE_CHANGE_EVENTS = 64 + MAX_KQUEUE_RESULT_EVENTS = 64 + + when hasThreadSupport: + type + SelectorImpl[T] = object + kqFD : cint + maxFD : uint + changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] + changesCount: int + fds: ptr SharedArray[SelectorKey[T]] + count: int + changesLock: Lock + else: + type + SelectorImpl[T] = object + kqFD : cint + maxFD : uint + changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] + changesCount: int + fds: seq[SelectorKey[T]] + count: int + + when hasThreadSupport: + type Selector*[T] = ptr SelectorImpl[T] + else: + type Selector*[T] = ref SelectorImpl[T] + + type + SelectEventImpl = object + rfd: cint + wfd: cint + # SelectEvent is declared as `ptr` to be placed in `shared memory`, + # so you can share one SelectEvent handle between threads. + type SelectEvent* = ptr SelectEventImpl + + proc newSelector*[T](): Selector[T] = + var maxFD = getMaxFds() + var kqFD = kqueue() + if kqFD < 0: + raiseOsError(osLastError()) + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.kqFD = kqFD + result.maxFD = maxFD.uint + result.fds = allocSharedArray[SelectorKey[T]](maxFD) + initLock(result.changesLock) + else: + result = Selector[T](kqFD: kqFD, maxFD: maxFD.uint) + result.fds = newSeq[SelectorKey[T]](maxFD) + + proc close*[T](s: Selector[T]) = + if posix.close(s.kqFD) != 0: + raiseOSError(osLastError()) + when hasThreadSupport: + deinitLock(s.changesLock) + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) + + when hasThreadSupport: + template withChangeLock[T](s: Selector[T], body: untyped) = + acquire(s.changesLock) + {.locks: [s.changesLock].}: + try: + body + finally: + release(s.changesLock) + else: + template withChangeLock(s, body: untyped) = + body + + template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, + nflags: cushort, nfflags: cuint, ndata: int, + nudata: pointer) = + mixin withChangeLock + s.withChangeLock(): + s.changesTable[s.changesCount] = KEvent(ident: nident, + filter: nfilter, flags: nflags, + fflags: nfflags, data: ndata, + udata: nudata) + inc(s.changesCount) + if s.changesCount == MAX_KQUEUE_CHANGE_EVENTS: + if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount), + nil, 0, nil) == -1: + raiseOSError(osLastError()) + s.changesCount = 0 + + proc registerHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event], data: T) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setKey(s, fdi, fdi, {flagHandle} + events, 0, data) + if events != {}: + if eventRead in events: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if eventWrite in events: + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc updateHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event]) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0: + if flagHandle in s.fds[fdi].flags: + var ne = events + {flagHandle} + var oe = s.fds[fdi].flags + if oe != ne: + if (eventRead in oe) and (eventRead notin ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if (eventWrite in oe) and (eventWrite notin ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + if (eventRead notin oe) and (eventRead in ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if (eventWrite notin oe) and (eventWrite in ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + s.fds[fdi].flags = ne + else: + raise newException(ValueError, + "Could not update non-handle descriptor") + else: + raise newException(ValueError, + "Descriptor is not registered in queue") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): int {.discardable.} = + var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP).int + if fdi == -1: + raiseOsError(osLastError()) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + var mflags = if oneshot: {flagTimer, flagOneshot} + else: {flagTimer} + var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD + else: EV_ADD + setKey(s, fdi, fdi, mflags, 0, data) + # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, + # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds + # too + modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc registerSignal*[T](s: Selector[T], signal: int, + data: T): int {.discardable.} = + var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP).int + if fdi == -1: + raiseOsError(osLastError()) + + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setKey(s, fdi, signal, {flagSignal}, signal, data) + # block signal `signal` + var nmask: Sigset + var omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(signal)) + blockSignals(nmask, omask) + # to be compatible with linux semantic we need to "eat" signals + posix.signal(cint(signal), SIG_IGN) + modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc registerProcess*[T](s: Selector[T], pid: int, + data: T): int {.discardable.} = + var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP).int + if fdi == -1: + raiseOsError(osLastError()) + + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + var kflags: cushort = EV_ONESHOT or EV_ADD + setKey(s, fdi, pid, {flagProcess, flagOneshot}, pid, data) + modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + var flags = s.fds[fdi].flags + var filter: cshort = 0 + if s.fds[fdi].ident != 0 and flags != {}: + if flagHandle in flags: + # if events == 0, than descriptor was modified with + # updateHandle(fd, 0), so it was already deleted from kqueue. + if flags != {flagHandle}: + if eventRead in flags: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if eventWrite in flags: + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + elif flagTimer in flags: + filter = EVFILT_TIMER + discard posix.close(cint(s.fds[fdi].key.fd)) + modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) + dec(s.count) + elif flagSignal in flags: + filter = EVFILT_SIGNAL + # unblocking signal + var nmask = Sigset() + var omask = Sigset() + var signal = cint(s.fds[fdi].param) + discard sigaddset(nmask, signal) + unblockSignals(nmask, omask) + posix.signal(signal, SIG_DFL) + discard posix.close(cint(s.fds[fdi].key.fd)) + modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) + dec(s.count) + elif flagProcess in flags: + filter = EVFILT_PROC + discard posix.close(cint(s.fds[fdi].key.fd)) + modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) + dec(s.count) + elif flagUser in flags: + filter = EVFILT_READ + modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) + dec(s.count) + clearKey(s, fdi) + + proc flush*[T](s: Selector[T]) = + s.withChangeLock(): + var tv = Timespec() + if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount), + nil, 0, addr tv) == -1: + raiseOSError(osLastError()) + s.changesCount = 0 + + template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + + proc newEvent*(): SelectEvent = + var fds: array[2, cint] + + if posix.pipe(fds) == -1: + raiseOSError(osLastError()) + + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rfd = fds[0] + result.wfd = fds[1] + + proc setEvent*(ev: SelectEvent) = + var data: int = 1 + if posix.write(ev.wfd, addr data, sizeof(int)) != sizeof(int): + raiseOSError(osLastError()) + + proc close*(ev: SelectEvent) = + discard posix.close(cint(ev.rfd)) + discard posix.close(cint(ev.wfd)) + deallocShared(cast[pointer](ev)) + + proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + let fdi = ev.rfd.int + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0 and s.fds[fdi].flags == {}: + setKey(s, fdi, fdi, {flagUser}, 0, data) + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Event wait still pending!") + + proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fdi = ev.rfd.int + if fdi.uint < s.maxFD: + var flags = s.fds[fdi].flags + if s.fds[fdi].ident != 0 and flags != {}: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + clearKey(s, fdi) + + proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey[T]]): int = + var + tv: Timespec + resultsTable: array[MAX_KQUEUE_RESULT_EVENTS, KEvent] + ptv: ptr Timespec = addr tv + + if timeout != -1: + if timeout >= 1000: + tv.tv_sec = (timeout div 1_000).Time + tv.tv_nsec = (timeout %% 1_000) * 1_000_000 + else: + tv.tv_sec = 0.Time + tv.tv_nsec = timeout * 1_000_000 + else: + ptv = nil + + var maxResults = MAX_KQUEUE_RESULT_EVENTS + if maxResults > len(results): + maxResults = len(results) + + var count = 0 + s.withChangeLock(): + count = kevent(s.kqFD, + addr(s.changesTable[0]), cint(s.changesCount), + addr(resultsTable[0]), cint(maxResults), ptv) + s.changesCount = 0 + if count >= 0: + var skey: ptr SelectorKey[T] + var i = 0 + var k = 0 + while i < count: + var kevent = addr(resultsTable[i]) + if (kevent.flags and EV_ERROR) == 0: + var events: set[Event] = {} + case kevent.filter + of EVFILT_READ: + skey = addr(s.fds[kevent.ident.int]) + if flagHandle in skey.flags: + events = {eventRead} + elif flagUser in skey.flags: + var data: int = 0 + if posix.read(kevent.ident.cint, addr data, + sizeof(int)) != sizeof(int): + let err = osLastError() + if err == OSErrorCode(EAGAIN): + # someone already consumed event data + inc(i) + continue + else: + raiseOSError(osLastError()) + events = {eventUser} + else: + events = {eventRead} + of EVFILT_WRITE: + skey = addr(s.fds[kevent.ident.int]) + events = {eventWrite} + of EVFILT_TIMER: + skey = addr(s.fds[kevent.ident.int]) + if flagOneshot in skey.flags: + if posix.close(skey.ident.cint) == -1: + raiseOSError(osLastError()) + clearKey(s, skey.ident) + # no need to modify kqueue, because EV_ONESHOT is already made + # this for us + dec(s.count) + events = {eventTimer} + of EVFILT_VNODE: + skey = addr(s.fds[kevent.ident.int]) + events = {eventVnode} + of EVFILT_SIGNAL: + skey = addr(s.fds[cast[int](kevent.udata)]) + events = {eventSignal} + of EVFILT_PROC: + skey = addr(s.fds[cast[int](kevent.udata)]) + if posix.close(skey.ident.cint) == -1: + raiseOSError(osLastError()) + clearKey(s, skey.ident) + # no need to modify kqueue, because EV_ONESHOT is already made + # this for us + dec(s.count) + events = {eventProcess} + else: + raise newException(ValueError, + "Unsupported kqueue filter in queue") + + if (kevent.flags and EV_EOF) != 0: + events = events + {eventError} + results[k].fd = skey.key.fd + results[k].events = events + results[k].data = skey.key.data + inc(k) + inc(i) + result = k + else: + result = 0 + let err = osLastError() + if cint(err) != EINTR: + raiseOSError(err) + + proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + result = newSeq[ReadyKey[T]](MAX_KQUEUE_RESULT_EVENTS) + var count = selectInto(s, timeout, result) + result.setLen(count) + + # + # Linux epoll + # + + elif defined(linux): + const + MAX_EPOLL_RESULT_EVENTS = 64 + type + SignalFdInfo* {.importc: "struct signalfd_siginfo", + header: "", pure, final.} = object + ssi_signo*: uint32 + ssi_errno*: int32 + ssi_code*: int32 + ssi_pid*: uint32 + ssi_uid*: uint32 + ssi_fd*: int32 + ssi_tid*: uint32 + ssi_band*: uint32 + ssi_overrun*: uint32 + ssi_trapno*: uint32 + ssi_status*: int32 + ssi_int*: int32 + ssi_ptr*: uint64 + ssi_utime*: uint64 + ssi_stime*: uint64 + ssi_addr*: uint64 + pad* {.importc: "__pad".}: array[0..47, uint8] + type + eventFdData {.importc: "eventfd_t", + header: "", pure, final.} = uint64 + epoll_data {.importc: "union epoll_data", + header: "", + pure, final.} = object + u64 {.importc: "u64".}: uint64 + + epoll_event {.importc: "struct epoll_event", + header: "", pure, final.} = object + events: uint32 # Epoll events + data: epoll_data # User data variable + const + EPOLL_CTL_ADD = 1 # Add a file descriptor to the interface. + EPOLL_CTL_DEL = 2 # Remove a file descriptor from the interface. + EPOLL_CTL_MOD = 3 # Change file descriptor epoll_event structure. + const + EPOLLIN = 0x00000001 + EPOLLOUT = 0x00000004 + EPOLLERR = 0x00000008 + EPOLLHUP = 0x00000010 + EPOLLRDHUP = 0x00002000 + EPOLLONESHOT = 1 shl 30 + + proc epoll_create(size: cint): cint + {.importc: "epoll_create", header: "".} + proc epoll_ctl(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint + {.importc: "epoll_ctl", header: "".} + proc epoll_wait(epfd: cint; events: ptr epoll_event; maxevents: cint; + timeout: cint): cint + {.importc: "epoll_wait", header: "".} + proc timerfd_create(clock_id: ClockId, flags: cint): cint + {.cdecl, importc: "timerfd_create", header: "".} + proc timerfd_settime(ufd: cint, flags: cint, + utmr: var Itimerspec, otmr: var Itimerspec): cint + {.cdecl, importc: "timerfd_settime", header: "".} + proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint + {.cdecl, importc: "signalfd", header: "".} + proc eventfd(count: cuint, flags: cint): cint + {.cdecl, importc: "eventfd", header: "".} + + when hasThreadSupport: + type + SelectorImpl[T] = object + epollFD : cint + maxFD : uint + fds: ptr SharedArray[SelectorKey[T]] + count: int + else: + type + SelectorImpl[T] = object + epollFD : cint + maxFD : uint + fds: seq[SelectorKey[T]] + count: int + + when hasThreadSupport: + type Selector*[T] = ptr SelectorImpl[T] + else: + type Selector*[T] = ref SelectorImpl[T] + + type + SelectEventImpl = object + efd: cint + + type SelectEvent* = ptr SelectEventImpl + + proc newSelector*[T](): Selector[T] = + var maxFD = getMaxFds() + var epollFD = epoll_create(MAX_EPOLL_RESULT_EVENTS) + if epollFD < 0: + raiseOsError(osLastError()) + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.epollFD = epollFD + result.maxFD = maxFD.uint + result.fds = allocSharedArray[SelectorKey[T]](maxFD) + else: + result = Selector[T](epollFD: epollFD, maxFD: maxFD.uint) + result.fds = newSeq[SelectorKey[T]](maxFD) + + proc close*[T](s: Selector[T]) = + if posix.close(s.epollFD) != 0: + raiseOSError(osLastError()) + when hasThreadSupport: + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) + + proc registerHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event], data: T) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setKey(s, fdi, fdi, events + {flagHandle}, 0, data) + if events != {}: + var epv: epoll_event + epv.events = EPOLLRDHUP + epv.data.u64 = fdi.uint + if eventRead in events: + epv.events = epv.events or EPOLLIN + if eventWrite in events: + epv.events = epv.events or EPOLLOUT + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc updateHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event]) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0: + var oe = s.fds[fdi].flags + if flagHandle in oe: + var ne = events + {flagHandle} + if oe != ne: + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLRDHUP + + if eventRead in events: + epv.events = epv.events or EPOLLIN + if eventWrite in events: + epv.events = epv.events or EPOLLOUT + + if oe == {flagHandle}: + if ne != {flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) + else: + if events != {}: + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + else: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + dec(s.count) + s.fds[fdi].flags = ne + else: + raise newException(ValueError, + "Could not update non-handle descriptor") + else: + raise newException(ValueError, + "Descriptor is not registered in queue") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = + var epv: epoll_event + var fdi = int(fd) + if fdi.uint < s.maxFD: + var flags = s.fds[fdi].flags + if s.fds[fdi].ident != 0 and flags != {}: + if flagHandle in flags: + # if events == {flagHandle}, then descriptor was already + # unregistered from epoll with updateHandle() call. + # This check is done to omit EBADF error. + if flags != {flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + dec(s.count) + elif flagTimer in flags: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + discard posix.close(fdi.cint) + dec(s.count) + elif flagSignal in flags: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + var nmask: Sigset + var omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(s.fds[fdi].param)) + unblockSignals(nmask, omask) + discard posix.close(fdi.cint) + dec(s.count) + elif flagProcess in flags: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + var nmask: Sigset + var omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, SIGCHLD) + unblockSignals(nmask, omask) + discard posix.close(fdi.cint) + dec(s.count) + clearKey(s, fdi) + + proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fdi = int(ev.efd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0 and flagUser in s.fds[fdi].flags: + clearKey(s, fdi) + var epv: epoll_event + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + dec(s.count) + + proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): int {.discardable.} = + var + new_ts: Itimerspec + old_ts: Itimerspec + var fdi = timerfd_create(CLOCK_MONOTONIC, 0) + if fdi == -1: + raiseOSError(osLastError()) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + var flags = {flagTimer} + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + setNonBlocking(fdi.cint) + if oneshot: + new_ts.it_interval.tv_sec = 0.Time + new_ts.it_interval.tv_nsec = 0 + new_ts.it_value.tv_sec = (timeout div 1_000).Time + new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 + flags = flags + {flagOneshot} + epv.events = epv.events or EPOLLONESHOT + else: + new_ts.it_interval.tv_sec = (timeout div 1000).Time + new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 + new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec + new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec + if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: + raiseOSError(osLastError()) + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, fdi, flags, 0, data) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc registerSignal*[T](s: Selector[T], signal: int, + data: T): int {.discardable.} = + var + nmask: Sigset + omask: Sigset + fdi: cint + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, signal.cint) + blockSignals(nmask, omask) + fdi = signalfd(-1, nmask, 0) + if fdi == -1: + raiseOSError(osLastError()) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setNonBlocking(fdi.cint) + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, signal, {flagSignal}, signal, data) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc registerProcess*[T](s: Selector, pid: int, + data: T): int {.discardable.} = + var + nmask: Sigset + omask: Sigset + fd: int + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, posix.SIGCHLD) + blockSignals(nmask, omask) + try: + var fdi = signalfd(-1, nmask, 0) + if fd == -1: + raiseOSError(osLastError()) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setNonBlocking(fdi.cint) + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, pid, {flagProcess}, pid, data) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + except: + if fd != -1: discard posix.close(fd.cint) + unblockSignals(omask, nmask) + + proc flush*[T](s: Selector[T]) = + discard + + template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + + proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + let fdi = int(ev.efd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setKey(s, fdi, fdi, {flagUser}, 0, data) + var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) + epv.data.u64 = ev.efd.uint + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc setEvent*(ev: SelectEvent) = + var data : uint64 = 1 + if posix.write(ev.efd, addr data, sizeof(uint64)) == -1: + raiseOSError(osLastError()) + + proc close*(ev: SelectEvent) = + discard posix.close(ev.efd) + deallocShared(cast[pointer](ev)) + + proc newEvent*(): SelectEvent = + var fdi = eventfd(0, 0) + if fdi == -1: + raiseOSError(osLastError()) + setNonBlocking(fdi) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.efd = cint(fdi) + + proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey[T]]): int = + var + resultsTable: array[MAX_EPOLL_RESULT_EVENTS, epoll_event] + + var maxResults = MAX_EPOLL_RESULT_EVENTS + if maxResults > len(results): + maxResults = len(results) + + var count = epoll_wait(s.epollFD, addr(resultsTable[0]), maxResults.cint, + timeout.cint) + if count > 0: + var i = 0 + var k = 0 + while i < count: + var events: set[Event] = {} + let fdi = int(resultsTable[i].data.u64) + var skey = addr(s.fds[fdi]) + let pevents = resultsTable[i].events + var flags = s.fds[fdi].flags + + if skey.ident != 0 and flags != {}: + block processItem: + if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0: + events = events + {eventError} + if (pevents and EPOLLOUT) != 0: + events = events + {eventWrite} + if (pevents and EPOLLIN) != 0: + if flagHandle in flags: + events = events + {eventRead} + elif flagTimer in flags: + var data: uint64 = 0 + if posix.read(fdi.cint, addr data, + sizeof(uint64)) != sizeof(uint64): + raiseOSError(osLastError()) + events = events + {eventTimer} + elif flagSignal in flags: + var data: SignalFdInfo + if posix.read(fdi.cint, addr data, + sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): + raiseOsError(osLastError()) + events = events + {eventSignal} + elif flagProcess in flags: + var data: SignalFdInfo + if posix.read(fdi.cint, addr data, + sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): + raiseOsError(osLastError()) + if cast[int](data.ssi_pid) == skey.param: + events = events + {eventProcess} + # we want to free resources for this event + flags = flags + {flagOneshot} + else: + break processItem + elif flagUser in flags: + var data: uint = 0 + if posix.read(fdi.cint, addr data, + sizeof(uint)) != sizeof(uint): + let err = osLastError() + if err == OSErrorCode(EAGAIN): + # someone already consumed event data + inc(i) + continue + else: + raiseOSError(err) + events = events + {eventUser} + else: + raise newException(ValueError, + "Unsupported epoll event in queue") + results[k].fd = skey.key.fd + results[k].events = events + results[k].data = skey.key.data + + if flagOneshot in flags: + var epv: epoll_event + try: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + finally: + discard posix.close(fdi.cint) + s.fds[fdi].ident = 0 + s.fds[fdi].flags = {} + dec(s.count) + inc(k) + inc(i) + result = k + elif count == 0: + discard + else: + result = 0 + let err = osLastError() + if cint(err) != EINTR: + raiseOSError(err) + + proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + result = newSeq[ReadyKey[T]](MAX_EPOLL_RESULT_EVENTS) + var count = selectInto(s, timeout, result) + result.setLen(count) + + # + # Windows select + # + + elif defined(windows): + const FD_SETSIZE = 64 + + import hashes, nativesockets + + when hasThreadSupport: + import sharedtables + else: + import tables + + proc hash*(x: SocketHandle): Hash {.borrow.} + proc `$`*(x: SocketHandle): string {.borrow.} + + proc WSAFDIsSet(s: SocketHandle, fdSet: var TFdSet): bool {. + stdcall, importc: "__WSAFDIsSet", dynlib: "ws2_32.dll", noSideEffect.} + + template iFD_ISSET(s: SocketHandle, fdSet: var TFdSet): bool = + if WSAFDIsSet(s, fdSet): true else: false + + template iFD_SET(s: SocketHandle, fdSet: var TFdSet) = + block: + var i = 0 + while i < fdSet.fd_count: + if fdSet.fd_array[i] == s: + break + inc(i) + if i == fdSet.fd_count: + if fdSet.fd_count < ioselectors.FD_SETSIZE: + fdSet.fd_array[i] = s + inc(fdSet.fd_count) + + template iFD_CLR(s: SocketHandle, fdSet: var TFdSet) = + block: + var i = 0 + while i < fdSet.fd_count: + if fdSet.fd_array[i] == s: + if i == fdSet.fd_count - 1: + fdSet.fd_array[i] = 0.SocketHandle + else: + while i < (fdSet.fd_count - 1): + fdSet.fd_array[i] = fdSet.fd_array[i + 1] + inc(i) + dec(fdSet.fd_count) + break + inc(i) + + template iFD_ZERO(fdSet: var TFdSet) = + fdSet.fd_count = 0 + + when hasThreadSupport: + type + SelectorImpl[T] = object + rSet: TFdSet + wSet: TFdSet + eSet: TFdSet + maxFD: uint + fds: SharedTable[SocketHandle, SelectorKey[T]] + count: int + lock: Lock + else: + type + SelectorImpl[T] = object + rSet: TFdSet + wSet: TFdSet + eSet: TFdSet + maxFD: uint + fds: Table[SocketHandle, SelectorKey[T]] + count: int + + when hasThreadSupport: + type Selector*[T] = ptr SelectorImpl[T] + else: + type Selector*[T] = ref SelectorImpl[T] + + type + SelectEventImpl = object + rsock: SocketHandle + wsock: SocketHandle + + type SelectEvent* = ptr SelectEventImpl + + when hasThreadSupport: + template withSelectLock[T](s: Selector[T], body: untyped) = + acquire(s.lock) + {.locks: [s.lock].}: + try: + body + finally: + release(s.lock) + else: + template withSelectLock[T](s: Selector[T], body: untyped) = + body + + proc newSelector*[T](): Selector[T] = + var maxFD = FD_SETSIZE + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.maxFD = maxFD.uint + result.fds = initSharedTable[SocketHandle, SelectorKey[T]]() + initLock result.lock + else: + result = Selector[T](maxFD: FD_SETSIZE) + result.maxFD = maxFD.uint + result.fds = initTable[SocketHandle, SelectorKey[T]]() + + iFD_ZERO(result.rSet) + iFD_ZERO(result.wSet) + iFD_ZERO(result.eSet) + + proc close*(s: Selector) = + when hasThreadSupport: + deinitSharedTable(s.fds) + deallocShared(cast[pointer](s)) + + template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + + template selectAdd[T](s: Selector[T], fd: SocketHandle, + events: set[Event]) = + mixin withSelectLock + s.withSelectLock(): + if eventRead in events: + if s.rSet.fd_count == FD_SETSIZE: + raise newException(ValueError, "Maximum numbers of fds exceeded") + iFD_SET(fd, s.rSet) + inc(s.count) + if eventWrite in events: + if s.wSet.fd_count == FD_SETSIZE: + raise newException(ValueError, "Maximum numbers of fds exceeded") + iFD_SET(fd, s.wSet) + iFD_SET(fd, s.eSet) + inc(s.count) + + proc registerHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event], data: T) = + var fdi = int(fd) + var flags = {flagHandle} + events + var nkey = SelectorKey[T](ident: fdi, flags: flags) + nkey.key.fd = fdi + nkey.key.data = data + + if s.fds.hasKeyOrPut(fd, nkey): + raise newException(ValueError, "Re-use of non closed descriptor") + selectAdd(s, fd, flags) + + proc updateHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event]) = + s.withSelectLock(): + withValue(s.fds, fd, skey) do: + if flagHandle in skey.flags: + var oe = skey.flags + var ne = events + {flagHandle} + if oe != ne: + if (eventRead in oe) and (eventRead notin ne): + iFD_CLR(fd, s.rSet) + dec(s.count) + if (eventWrite in oe) and (eventWrite notin ne): + iFD_CLR(fd, s.wSet) + iFD_CLR(fd, s.eSet) + dec(s.count) + if (eventRead notin oe) and (eventRead in ne): + iFD_SET(fd, s.rSet) + inc(s.count) + if (eventWrite notin oe) and (eventWrite in ne): + iFD_SET(fd, s.wSet) + iFD_SET(fd, s.eSet) + inc(s.count) + skey.flags = ne + else: + raise newException(ValueError, + "Could not update non-handle descriptor") + do: + raise newException(ValueError, + "Descriptor is not registered in queue") + + proc registerTimer*[T](s: Selector, timeout: int, oneshot: bool, + data: T): int {.discardable.} = + raise newException(ValueError, "Not implemented") + + proc registerSignal*[T](s: Selector, signal: int, + data: T): int {.discardable.} = + raise newException(ValueError, "Not implemented") + + proc registerProcess*[T](s: Selector, pid: int, + data: T): int {.discardable.} = + raise newException(ValueError, "Not implemented") + + proc flush*[T](s: Selector[T]) = discard + + proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fd = ev.rsock + s.withSelectLock(): + iFD_CLR(fd, s.rSet) + dec(s.count) + s.fds.del(fd) + + + proc unregister*[T](s: Selector[T], fd: SocketHandle) = + s.withSelectLock(): + s.fds.withValue(fd, skey) do: + if eventRead in skey.flags: + iFD_CLR(fd, s.rSet) + dec(s.count) + if eventWrite in skey.flags: + iFD_CLR(fd, s.wSet) + iFD_CLR(fd, s.eSet) + dec(s.count) + s.fds.del(fd) + + proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + var flags = {flagUser, eventRead} + var nkey = SelectorKey[T](ident: ev.rsock.int, flags: flags) + nkey.key.fd = ev.rsock.int + nkey.key.data = data + if s.fds.hasKeyOrPut(ev.rsock, nkey): + raise newException(ValueError, "Re-use of non closed descriptor") + selectAdd(s, ev.rsock, flags) + + proc newEvent*(): SelectEvent = + var ssock = newNativeSocket() + var wsock = newNativeSocket() + var rsock: SocketHandle = INVALID_SOCKET + var saddr = Sockaddr_in() + try: + saddr.sin_family = winlean.AF_INET + saddr.sin_port = 0 + saddr.sin_addr.s_addr = INADDR_ANY + if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseOSError(osLastError()) + + if winlean.listen(ssock, 1) == -1: + raiseOSError(osLastError()) + + var namelen = sizeof(saddr).SockLen + if getsockname(ssock, cast[ptr SockAddr](addr(saddr)), + addr(namelen)) == -1'i32: + raiseOSError(osLastError()) + + saddr.sin_addr.s_addr = 0x0100007F + if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) == -1: + raiseOSError(osLastError()) + namelen = sizeof(saddr).SockLen + rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)), + cast[ptr SockLen](addr(namelen))) + if rsock == SocketHandle(-1): + raiseOSError(osLastError()) + + if winlean.closesocket(ssock) == -1: + raiseOSError(osLastError()) + + var mode = clong(1) + if ioctlsocket(rsock, FIONBIO, addr(mode)) == -1: + raiseOSError(osLastError()) + mode = clong(1) + if ioctlsocket(wsock, FIONBIO, addr(mode)) == -1: + raiseOSError(osLastError()) + + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rsock = rsock + result.wsock = wsock + except: + discard winlean.closesocket(ssock) + discard winlean.closesocket(wsock) + if rsock != INVALID_SOCKET: + discard winlean.closesocket(rsock) + + proc setEvent*(ev: SelectEvent) = + var data: int = 1 + if winlean.send(ev.wsock, cast[pointer](addr data), + cint(sizeof(int)), 0) != sizeof(int): + raiseOSError(osLastError()) + + proc close*(ev: SelectEvent) = + discard winlean.closesocket(ev.rsock) + discard winlean.closesocket(ev.wsock) + deallocShared(cast[pointer](ev)) + + proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey[T]]): int = + var tv = Timeval() + var ptv = addr tv + var rset, wset, eset: TFdSet + + if timeout != -1: + tv.tv_sec = timeout.int32 div 1_000 + tv.tv_usec = (timeout.int32 %% 1_000) * 1_000 + else: + ptv = nil + + s.withSelectLock(): + rset = s.rSet + wset = s.wSet + eset = s.eSet + + var count = select(cint(0), addr(rset), addr(wset), + addr(eset), ptv).int + if count > 0: + var rindex = 0 + var i = 0 + while i < rset.fd_count: + let fd = rset.fd_array[i] + if iFD_ISSET(fd, rset): + var events = {eventRead} + if iFD_ISSET(fd, eset): events = events + {eventError} + if iFD_ISSET(fd, wset): events = events + {eventWrite} + s.fds.withValue(fd, skey) do: + if flagHandle in skey.flags: + skey.key.events = events + elif flagUser in skey.flags: + var data: int = 0 + if winlean.recv(fd, cast[pointer](addr(data)), + sizeof(int).cint, 0) != sizeof(int): + let err = osLastError() + if err != OSErrorCode(WSAEWOULDBLOCK): + raiseOSError(err) + else: + # someone already consumed event data + inc(i) + continue + skey.key.events = {eventUser} + results[rindex].fd = skey.key.fd + results[rindex].data = skey.key.data + results[rindex].events = skey.key.events + inc(rindex) + inc(i) + + i = 0 + while i < wset.fd_count: + let fd = wset.fd_array[i] + if iFD_ISSET(fd, wset): + var events = {eventWrite} + if not iFD_ISSET(fd, rset): + if iFD_ISSET(fd, eset): events = events + {eventError} + s.fds.withValue(fd, skey) do: + skey.key.events = events + results[rindex].fd = skey.key.fd + results[rindex].data = skey.key.data + results[rindex].events = skey.key.events + inc(rindex) + inc(i) + count = rindex + elif count == 0: + discard + else: + raiseOSError(osLastError()) + result = count + + proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + result = newSeq[ReadyKey[T]](FD_SETSIZE) + var count = selectInto(s, timeout, result) + result.setLen(count) + + # + # Posix poll + # + + else: + const MAX_POLL_RESULT_EVENTS = 64 + + when hasThreadSupport: + type + SelectorImpl[T] = object + maxFD : uint + pollcnt: int + fds: ptr SharedArray[SelectorKey[T]] + pollfds: ptr SharedArray[TPollFd] + count: int + lock: Lock + else: + type + SelectorImpl[T] = object + maxFD : uint + pollcnt: int + fds: seq[SelectorKey[T]] + pollfds: seq[TPollFd] + count: int + + when hasThreadSupport: + type Selector*[T] = ptr SelectorImpl[T] + else: + type Selector*[T] = ref SelectorImpl[T] + + type + SelectEventImpl = object + rfd: cint + wfd: cint + + type SelectEvent* = ptr SelectEventImpl + + when hasThreadSupport: + template withPollLock[T](s: Selector[T], body: untyped) = + acquire(s.lock) + {.locks: [s.lock].}: + try: + body + finally: + release(s.lock) + else: + template withPollLock(s, body: untyped) = + body + + proc newSelector*[T](): Selector[T] = + var maxFD = getMaxFds() + + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.maxFD = maxFD.uint + result.fds = allocSharedArray[SelectorKey[T]](maxFD) + result.pollfds = allocSharedArray[TPollFd](maxFD) + initLock(result.lock) + else: + result = Selector[T](maxFD: maxFD.uint) + result.fds = newSeq[SelectorKey[T]](maxFD) + result.pollfds = newSeq[TPollFd](maxFD) + + proc close*[T](s: Selector[T]) = + when hasThreadSupport: + deinitLock(s.lock) + deallocSharedArray(s.fds) + deallocSharedArray(s.pollfds) + deallocShared(cast[pointer](s)) + + template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) = + withPollLock(s): + var pollev: cshort = 0 + if eventRead in events: pollev = pollev or POLLIN + if eventWrite in events: pollev = pollev or POLLOUT + s.pollfds[s.pollcnt].fd = cint(sock) + s.pollfds[s.pollcnt].events = pollev + inc(s.count) + inc(s.pollcnt) + + template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) = + withPollLock(s): + var i = 0 + var pollev: cshort = 0 + if eventRead in events: pollev = pollev or POLLIN + if eventWrite in events: pollev = pollev or POLLOUT + + while i < s.pollcnt: + if s.pollfds[i].fd == sock: + s.pollfds[i].events = pollev + break + inc(i) + + if i == s.pollcnt: + raise newException(ValueError, + "Descriptor is not registered in queue") + + template pollRemove[T](s: Selector[T], sock: cint) = + withPollLock(s): + var i = 0 + while i < s.pollcnt: + if s.pollfds[i].fd == sock: + if i == s.pollcnt - 1: + s.pollfds[i].fd = 0 + s.pollfds[i].events = 0 + s.pollfds[i].revents = 0 + else: + while i < (s.pollcnt - 1): + s.pollfds[i].fd = s.pollfds[i + 1].fd + s.pollfds[i].events = s.pollfds[i + 1].events + inc(i) + break + inc(i) + dec(s.pollcnt) + dec(s.count) + + proc registerHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event], data: T) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + setKey(s, fdi, fdi, {flagHandle} + events, 0, data) + s.pollAdd(fdi.cint, events) + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc updateHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event]) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0: + var oe = s.fds[fdi].flags + if flagHandle in oe: + var ne = events + {flagHandle} + if ne != oe: + if events != {}: + s.pollUpdate(fd.cint, events) + else: + s.pollRemove(fd.cint) + s.fds[fdi].flags = ne + else: + raise newException(ValueError, + "Could not update non-handle descriptor") + else: + raise newException(ValueError, "Re-use of non closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): int {.discardable.} = + raise newException(ValueError, "Not implemented") + + proc registerSignal*[T](s: Selector[T], signal: int, + data: T): int {.discardable.} = + raise newException(ValueError, "Not implemented") + + proc registerProcess*[T](s: Selector[T], pid: int, + data: T): int {.discardable.} = + raise newException(ValueError, "Not implemented") + + proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + var fdi = int(ev.rfd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident == 0: + var events = {flagUser, eventRead} + setKey(s, fdi, fdi, events, 0, data) + s.pollAdd(fdi.cint, events) + else: + raise newException(ValueError, "Re-use of non-closed descriptor") + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + proc flush*[T](s: Selector[T]) = discard + + template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + + proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0 and s.fds[fdi].flags != {}: + clearKey(s, fdi) + s.pollRemove(fdi.cint) + + proc unregister*[T](s: Selector[T], ev: SelectEvent) = + var fdi = int(ev.rfd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0 and (flagUser in s.fds[fdi].flags): + clearKey(s, fdi) + s.pollRemove(fdi.cint) + + proc newEvent*(): SelectEvent = + var fds: array[2, cint] + if posix.pipe(fds) == -1: + raiseOSError(osLastError()) + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rfd = fds[0] + result.wfd = fds[1] + + proc setEvent*(ev: SelectEvent) = + var data: int = 1 + if posix.write(ev.wfd, addr data, sizeof(int)) != sizeof(int): + raiseOSError(osLastError()) + + proc close*(ev: SelectEvent) = + discard posix.close(cint(ev.rfd)) + discard posix.close(cint(ev.wfd)) + deallocShared(cast[pointer](ev)) + + proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey[T]]): int = + var maxResults = MAX_POLL_RESULT_EVENTS + if maxResults > len(results): + maxResults = len(results) + + s.withPollLock(): + var count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout) + if count > 0: + var i = 0 + var k = 0 + var rindex = 0 + while (i < s.pollcnt) and (k < count) and (rindex < maxResults): + let revents = s.pollfds[i].revents + let fd = s.pollfds[i].fd + if revents != 0: + var events: set[Event] = {} + if (revents and POLLIN) != 0: + events = events + {eventRead} + if (revents and POLLOUT) != 0: + events = events + {eventWrite} + if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or + (revents and POLLNVAL) != 0: + events = events + {eventError} + + var skey = addr(s.fds[fd]) + if flagUser in skey.flags: + if eventRead in events: + var data: int = 0 + if posix.read(fd, addr data, sizeof(int)) != sizeof(int): + let err = osLastError() + if err != OSErrorCode(EAGAIN): + raiseOSError(osLastError()) + else: + # someone already consumed event data + inc(i) + continue + events = {eventUser} + + results[rindex].fd = fd + results[rindex].events = events + results[rindex].data = skey.key.data + s.pollfds[i].revents = 0 + inc(rindex) + inc(k) + inc(i) + result = k + elif count == 0: + discard + else: + let err = osLastError() + if err.cint == EINTR: + discard + else: + raiseOSError(osLastError()) + + proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + result = newSeq[ReadyKey[T]](MAX_POLL_RESULT_EVENTS) + var count = selectInto(s, timeout, result) + result.setLen(count) + + when not defined(windows): + template withData*[T](s: Selector[T], fd: SocketHandle, value, + body: untyped) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0: + var value = addr(s.fds[fdi].key.data) + body + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + + template withData*[T](s: Selector[T], fd: SocketHandle, value, body1, + body2: untyped) = + var fdi = int(fd) + if fdi.uint < s.maxFD: + if s.fds[fdi].ident != 0: + var value = addr(s.fds[fdi].key.data) + body1 + else: + body2 + else: + raise newException(ValueError, "Maximum file descriptors exceeded") + else: + template withData*(s: Selector, fd: SocketHandle, value, body: untyped) = + s.fds.withValue(fd, skey) do: + var value {.inject.} = addr(skey.key.data) + body + + template withData*(s: Selector, fd: SocketHandle, value, + body1, body2: untyped) = + s.fds.withValue(fd, skey) do: + var value {.inject.} = addr(skey.key.data) + body1 + do: + body2 diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim new file mode 100644 index 0000000000..2ba116ef1a --- /dev/null +++ b/tests/async/tioselectors.nim @@ -0,0 +1,407 @@ +discard """ + file: "tioselectors.nim" + output: "All tests passed!" +""" +import ioselectors + +const hasThreadSupport = compileOption("threads") + +template processTest(t, x: untyped) = + #stdout.write(t) + #stdout.flushFile() + if not x: echo(t & " FAILED\r\n") + +when not defined(windows): + import os, posix, osproc, nativesockets, times + + const supportedPlatform = defined(macosx) or defined(freebsd) or + defined(netbsd) or defined(openbsd) or + defined(linux) + + proc socket_notification_test(): bool = + proc create_test_socket(): SocketHandle = + var sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP) + var x: int = fcntl(sock, F_GETFL, 0) + if x == -1: raiseOSError(osLastError()) + else: + var mode = x or O_NONBLOCK + if fcntl(sock, F_SETFL, mode) == -1: + raiseOSError(osLastError()) + result = sock + + var client_message = "SERVER HELLO =>" + var server_message = "CLIENT HELLO" + var buffer : array[128, char] + + var selector = newSelector[int]() + var client_socket = create_test_socket() + var server_socket = create_test_socket() + + registerHandle(selector, server_socket, {eventRead}, 0) + registerHandle(selector, client_socket, {eventWrite}, 0) + + var option : int32 = 1 + if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), + addr(option), sizeof(option).SockLen) < 0: + raiseOSError(osLastError()) + + var aiList = getAddrInfo("0.0.0.0", Port(13337)) + if bindAddr(server_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) < 0'i32: + dealloc(aiList) + raiseOSError(osLastError()) + discard server_socket.listen() + dealloc(aiList) + + aiList = getAddrInfo("127.0.0.1", Port(13337)) + discard posix.connect(client_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) + dealloc(aiList) + var rc1 = selector.select(100) + assert(len(rc1) == 2) + + var sockAddress: SockAddr + var addrLen = sizeof(sockAddress).Socklen + var server2_socket = accept(server_socket, + cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + assert(server2_socket != osInvalidSocket) + selector.registerHandle(server2_socket, {eventRead}, 0) + + if posix.send(client_socket, addr(client_message[0]), + len(client_message), 0) == -1: + raiseOSError(osLastError()) + + selector.updateHandle(client_socket, {eventRead}) + + var rc2 = selector.select(100) + assert(len(rc2) == 1) + + var read_count = posix.recv(server2_socket, addr (buffer[0]), 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(client_message)) + var test1 = true + for i in 0.. Date: Thu, 23 Jun 2016 23:16:00 +0300 Subject: [PATCH 2/5] Forgot about newlines at the end --- tests/async/tioselectors.nim | 2 +- tests/async/tioselectors.nim.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim index 2ba116ef1a..f3f38be7fe 100644 --- a/tests/async/tioselectors.nim +++ b/tests/async/tioselectors.nim @@ -404,4 +404,4 @@ else: when hasThreadSupport: processTest("Multithreaded user event notification test...", mt_event_test()) - echo("All tests passed!") \ No newline at end of file + echo("All tests passed!") diff --git a/tests/async/tioselectors.nim.cfg b/tests/async/tioselectors.nim.cfg index 73752f0420..b1b896858b 100644 --- a/tests/async/tioselectors.nim.cfg +++ b/tests/async/tioselectors.nim.cfg @@ -1 +1 @@ -threads:on -d:threadsafe \ No newline at end of file +threads:on -d:threadsafe From 2eb34a8129aa82ab74035886881c84b8f0489c56 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sun, 26 Jun 2016 09:51:01 +0300 Subject: [PATCH 3/5] Event enum is now .pure. Modified getMaxFds() to work more properly. Removed seqs from code Some exceptions replaced with doAssert --- lib/pure/ioselectors.nim | 801 ++++++++++++++++------------------- tests/async/tioselectors.nim | 22 +- 2 files changed, 377 insertions(+), 446 deletions(-) diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim index b77ab9295a..65e549b193 100644 --- a/lib/pure/ioselectors.nim +++ b/lib/pure/ioselectors.nim @@ -50,16 +50,16 @@ when defined(nimdoc): Selector*[T] = ref object ## An object which holds descriptors to be checked for read/write status - Event* = enum + Event* {.pure.} = enum ## An enum which hold event types - eventRead, ## Descriptor is available for read - eventWrite, ## Descriptor is available for write - eventTimer, ## Timer descriptor is completed - eventSignal, ## Signal is raised - eventProcess, ## Process is finished - eventVnode, ## Currently not supported - eventUser, ## User event is raised - eventError ## Error happens while waiting, for descriptor + Read, ## Descriptor is available for read + Write, ## Descriptor is available for write + Timer, ## Timer descriptor is completed + Signal, ## Signal is raised + Process, ## Process is finished + Vnode, ## Currently not supported + User, ## User event is raised + Error ## Error happens while waiting, for descriptor ReadyKey*[T] = object ## An object which holds result for descriptor @@ -189,36 +189,68 @@ when defined(nimdoc): ## else: - when not defined(windows): + when defined(macosx) or defined(freebsd): when defined(macosx): - var - OPEN_MAX {.importc: "OPEN_MAX", header: "".}: cint + const maxDescriptors = 29 # KERN_MAXFILESPERPROC (MacOS) + else: + const maxDescriptors = 27 # KERN_MAXFILESPERPROC (FreeBSD) + proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr int, + newp: pointer, newplen: int): cint + {.importc: "sysctl",header: """#include + #include """} + elif defined(netbsd) or defined(openbsd): + # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using + # KERN_MAXFILES, because KERN_MAXFILES is always bigger, + # than KERN_MAXFILESPERPROC + const maxDescriptors = 7 # KERN_MAXFILES + proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr int, + newp: pointer, newplen: int): cint + {.importc: "sysctl",header: """#include + #include """} + elif defined(linux) or defined(solaris): + proc ulimit(cmd: cint): clong + {.importc: "ulimit", header: "", varargs.} + elif defined(windows): + discard + else: var RLIMIT_NOFILE {.importc: "RLIMIT_NOFILE", header: "".}: cint type rlimit {.importc: "struct rlimit", - header: "", pure, final.} = object + header: "", pure, final.} = object rlim_cur: int rlim_max: int - proc getrlimit(resource: cint, rlp: var rlimit): cint {. - importc: "getrlimit",header: ""} - proc getMaxFds*(): int = + proc getrlimit(resource: cint, rlp: var rlimit): cint + {.importc: "getrlimit",header: "".} + + proc getMaxFds*(): int = + when defined(macosx) or defined(freebsd) or defined(netbsd) or + defined(openbsd): + var count = cint(0) + var size = sizeof(count) + var namearr = [cint(1), cint(maxDescriptors)] + + if sysctl(addr namearr[0], 2, cast[pointer](addr count), addr size, + nil, 0) != 0: + raiseOsError(osLastError()) + result = count + elif defined(linux) or defined(solaris): + result = int(ulimit(4, 0)) + elif defined(windows): + result = FD_SETSIZE + else: var a = rlimit() if getrlimit(RLIMIT_NOFILE, a) != 0: raiseOsError(osLastError()) result = a.rlim_max - when defined(macosx): - if a.rlim_max > OPEN_MAX: - result = OPEN_MAX when hasThreadSupport: import locks type - Event* = enum - eventRead, eventWrite, eventTimer, eventSignal, eventProcess, - eventVnode, eventUser, eventError, + Event* {.pure.} = enum + Read, Write, Timer, Signal, Process, Vnode, User, Error, flagHandle, flagTimer, flagSignal, flagProcess, flagVnode, flagUser, flagOneshot @@ -234,20 +266,19 @@ else: key : ReadyKey[T] when not defined(windows): - when hasThreadSupport: - type - SharedArrayHolder[T] = object - part: array[16, T] - SharedArray {.unchecked.}[T] = array[0..100_000_000, T] + type + SharedArrayHolder[T] = object + part: array[16, T] + SharedArray {.unchecked.}[T] = array[0..100_000_000, T] - proc allocSharedArray[T](nsize: int): ptr SharedArray[T] = - let holder = cast[ptr SharedArrayHolder[T]]( - allocShared0(sizeof(T) * nsize) - ) - result = cast[ptr SharedArray[T]](addr(holder.part[0])) + proc allocSharedArray[T](nsize: int): ptr SharedArray[T] = + let holder = cast[ptr SharedArrayHolder[T]]( + allocShared0(sizeof(T) * nsize) + ) + result = cast[ptr SharedArray[T]](addr(holder.part[0])) - proc deallocSharedArray[T](sa: ptr SharedArray[T]) = - deallocShared(cast[pointer](sa)) + proc deallocSharedArray[T](sa: ptr SharedArray[T]) = + deallocShared(cast[pointer](sa)) template setNonBlocking(fd) = var x: int = fcntl(fd, F_GETFL, 0) @@ -298,30 +329,17 @@ else: MAX_KQUEUE_CHANGE_EVENTS = 64 MAX_KQUEUE_RESULT_EVENTS = 64 - when hasThreadSupport: - type - SelectorImpl[T] = object - kqFD : cint - maxFD : uint - changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] - changesCount: int - fds: ptr SharedArray[SelectorKey[T]] - count: int + type + SelectorImpl[T] = object + kqFD : cint + maxFD : uint + changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] + changesCount: int + fds: ptr SharedArray[SelectorKey[T]] + count: int + when hasThreadSupport: changesLock: Lock - else: - type - SelectorImpl[T] = object - kqFD : cint - maxFD : uint - changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] - changesCount: int - fds: seq[SelectorKey[T]] - count: int - - when hasThreadSupport: - type Selector*[T] = ptr SelectorImpl[T] - else: - type Selector*[T] = ref SelectorImpl[T] + Selector*[T] = ptr SelectorImpl[T] type SelectEventImpl = object @@ -336,23 +354,21 @@ else: var kqFD = kqueue() if kqFD < 0: raiseOsError(osLastError()) + + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.kqFD = kqFD + result.maxFD = maxFD.uint + result.fds = allocSharedArray[SelectorKey[T]](maxFD) when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.kqFD = kqFD - result.maxFD = maxFD.uint - result.fds = allocSharedArray[SelectorKey[T]](maxFD) initLock(result.changesLock) - else: - result = Selector[T](kqFD: kqFD, maxFD: maxFD.uint) - result.fds = newSeq[SelectorKey[T]](maxFD) proc close*[T](s: Selector[T]) = if posix.close(s.kqFD) != 0: raiseOSError(osLastError()) when hasThreadSupport: deinitLock(s.changesLock) - deallocSharedArray(s.fds) - deallocShared(cast[pointer](s)) + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) when hasThreadSupport: template withChangeLock[T](s: Selector[T], body: untyped) = @@ -386,17 +402,15 @@ else: events: set[Event], data: T) = var fdi = int(fd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setKey(s, fdi, fdi, {flagHandle} + events, 0, data) - if events != {}: - if eventRead in events: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if eventWrite in events: - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) + if events != {}: + if Event.Read in events: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if Event.Write in events: + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -404,30 +418,24 @@ else: events: set[Event]) = var fdi = int(fd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0: - if flagHandle in s.fds[fdi].flags: - var ne = events + {flagHandle} - var oe = s.fds[fdi].flags - if oe != ne: - if (eventRead in oe) and (eventRead notin ne): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) - if (eventWrite in oe) and (eventWrite notin ne): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) - dec(s.count) - if (eventRead notin oe) and (eventRead in ne): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if (eventWrite notin oe) and (eventWrite in ne): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, - "Could not update non-handle descriptor") - else: - raise newException(ValueError, - "Descriptor is not registered in queue") + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in s.fds[fdi].flags) + var ne = events + {Event.flagHandle} + var oe = s.fds[fdi].flags + if oe != ne: + if (Event.Read in oe) and (Event.Read notin ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Write in oe) and (Event.Write notin ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Read notin oe) and (Event.Read in ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if (Event.Write notin oe) and (Event.Write in ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + s.fds[fdi].flags = ne else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -438,20 +446,18 @@ else: if fdi == -1: raiseOsError(osLastError()) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - var mflags = if oneshot: {flagTimer, flagOneshot} - else: {flagTimer} - var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD - else: EV_ADD - setKey(s, fdi, fdi, mflags, 0, data) - # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, - # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds - # too - modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + var mflags = if oneshot: {Event.flagTimer, Event.flagOneshot} + else: {Event.flagTimer} + var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD + else: EV_ADD + setKey(s, fdi, fdi, mflags, 0, data) + # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, + # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds + # too + modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) + inc(s.count) + result = fdi else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -463,23 +469,21 @@ else: raiseOsError(osLastError()) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setKey(s, fdi, signal, {flagSignal}, signal, data) - # block signal `signal` - var nmask: Sigset - var omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, cint(signal)) - blockSignals(nmask, omask) - # to be compatible with linux semantic we need to "eat" signals - posix.signal(cint(signal), SIG_IGN) - modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, - cast[pointer](fdi)) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, signal, {Event.flagSignal}, signal, data) + # block signal `signal` + var nmask: Sigset + var omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(signal)) + blockSignals(nmask, omask) + # to be compatible with linux semantic we need to "eat" signals + posix.signal(cint(signal), SIG_IGN) + modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -491,15 +495,13 @@ else: raiseOsError(osLastError()) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - var kflags: cushort = EV_ONESHOT or EV_ADD - setKey(s, fdi, pid, {flagProcess, flagOneshot}, pid, data) - modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, - cast[pointer](fdi)) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + var kflags: cushort = EV_ONESHOT or EV_ADD + setKey(s, fdi, pid, {Event.flagProcess, Event.flagOneshot}, pid, data) + modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -509,22 +511,22 @@ else: var flags = s.fds[fdi].flags var filter: cshort = 0 if s.fds[fdi].ident != 0 and flags != {}: - if flagHandle in flags: + if Event.flagHandle in flags: # if events == 0, than descriptor was modified with # updateHandle(fd, 0), so it was already deleted from kqueue. - if flags != {flagHandle}: - if eventRead in flags: + if flags != {Event.flagHandle}: + if Event.Read in flags: modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) dec(s.count) - if eventWrite in flags: + if Event.Write in flags: modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) dec(s.count) - elif flagTimer in flags: + elif Event.flagTimer in flags: filter = EVFILT_TIMER discard posix.close(cint(s.fds[fdi].key.fd)) modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) dec(s.count) - elif flagSignal in flags: + elif Event.flagSignal in flags: filter = EVFILT_SIGNAL # unblocking signal var nmask = Sigset() @@ -536,12 +538,12 @@ else: discard posix.close(cint(s.fds[fdi].key.fd)) modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) dec(s.count) - elif flagProcess in flags: + elif Event.flagProcess in flags: filter = EVFILT_PROC discard posix.close(cint(s.fds[fdi].key.fd)) modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) dec(s.count) - elif flagUser in flags: + elif Event.flagUser in flags: filter = EVFILT_READ modifyKQueue(s, fdi.uint, filter, EV_DELETE, 0, 0, nil) dec(s.count) @@ -583,15 +585,10 @@ else: proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = ev.rfd.int - if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0 and s.fds[fdi].flags == {}: - setKey(s, fdi, fdi, {flagUser}, 0, data) - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - else: - raise newException(ValueError, "Re-use of non-closed descriptor") - else: - raise newException(ValueError, "Event wait still pending!") + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagUser}, 0, data) + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = ev.rfd.int @@ -640,9 +637,9 @@ else: case kevent.filter of EVFILT_READ: skey = addr(s.fds[kevent.ident.int]) - if flagHandle in skey.flags: - events = {eventRead} - elif flagUser in skey.flags: + if Event.flagHandle in skey.flags: + events = {Event.Read} + elif Event.flagUser in skey.flags: var data: int = 0 if posix.read(kevent.ident.cint, addr data, sizeof(int)) != sizeof(int): @@ -653,28 +650,28 @@ else: continue else: raiseOSError(osLastError()) - events = {eventUser} + events = {Event.User} else: - events = {eventRead} + events = {Event.Read} of EVFILT_WRITE: skey = addr(s.fds[kevent.ident.int]) - events = {eventWrite} + events = {Event.Write} of EVFILT_TIMER: skey = addr(s.fds[kevent.ident.int]) - if flagOneshot in skey.flags: + if Event.flagOneshot in skey.flags: if posix.close(skey.ident.cint) == -1: raiseOSError(osLastError()) clearKey(s, skey.ident) # no need to modify kqueue, because EV_ONESHOT is already made # this for us dec(s.count) - events = {eventTimer} + events = {Event.Timer} of EVFILT_VNODE: skey = addr(s.fds[kevent.ident.int]) - events = {eventVnode} + events = {Event.Vnode} of EVFILT_SIGNAL: skey = addr(s.fds[cast[int](kevent.udata)]) - events = {eventSignal} + events = {Event.Signal} of EVFILT_PROC: skey = addr(s.fds[cast[int](kevent.udata)]) if posix.close(skey.ident.cint) == -1: @@ -683,13 +680,13 @@ else: # no need to modify kqueue, because EV_ONESHOT is already made # this for us dec(s.count) - events = {eventProcess} + events = {Event.Process} else: raise newException(ValueError, "Unsupported kqueue filter in queue") if (kevent.flags and EV_EOF) != 0: - events = events + {eventError} + events = events + {Event.Error} results[k].fd = skey.key.fd results[k].events = events results[k].data = skey.key.data @@ -775,72 +772,55 @@ else: proc eventfd(count: cuint, flags: cint): cint {.cdecl, importc: "eventfd", header: "".} - when hasThreadSupport: - type - SelectorImpl[T] = object - epollFD : cint - maxFD : uint - fds: ptr SharedArray[SelectorKey[T]] - count: int - else: - type - SelectorImpl[T] = object - epollFD : cint - maxFD : uint - fds: seq[SelectorKey[T]] - count: int - - when hasThreadSupport: - type Selector*[T] = ptr SelectorImpl[T] - else: - type Selector*[T] = ref SelectorImpl[T] type + SelectorImpl[T] = object + epollFD : cint + maxFD : uint + fds: ptr SharedArray[SelectorKey[T]] + count: int + + Selector*[T] = ptr SelectorImpl[T] + SelectEventImpl = object efd: cint - type SelectEvent* = ptr SelectEventImpl + SelectEvent* = ptr SelectEventImpl proc newSelector*[T](): Selector[T] = var maxFD = getMaxFds() var epollFD = epoll_create(MAX_EPOLL_RESULT_EVENTS) if epollFD < 0: raiseOsError(osLastError()) - when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.epollFD = epollFD - result.maxFD = maxFD.uint - result.fds = allocSharedArray[SelectorKey[T]](maxFD) - else: - result = Selector[T](epollFD: epollFD, maxFD: maxFD.uint) - result.fds = newSeq[SelectorKey[T]](maxFD) + + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.epollFD = epollFD + result.maxFD = maxFD.uint + result.fds = allocSharedArray[SelectorKey[T]](maxFD) proc close*[T](s: Selector[T]) = if posix.close(s.epollFD) != 0: raiseOSError(osLastError()) - when hasThreadSupport: - deallocSharedArray(s.fds) - deallocShared(cast[pointer](s)) + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setKey(s, fdi, fdi, events + {flagHandle}, 0, data) - if events != {}: - var epv: epoll_event - epv.events = EPOLLRDHUP - epv.data.u64 = fdi.uint - if eventRead in events: - epv.events = epv.events or EPOLLIN - if eventWrite in events: - epv.events = epv.events or EPOLLOUT - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, events + {Event.flagHandle}, 0, data) + if events != {}: + var epv: epoll_event + epv.events = EPOLLRDHUP + epv.data.u64 = fdi.uint + if Event.Read in events: + epv.events = epv.events or EPOLLIN + if Event.Write in events: + epv.events = epv.events or EPOLLOUT + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -848,43 +828,37 @@ else: events: set[Event]) = var fdi = int(fd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0: - var oe = s.fds[fdi].flags - if flagHandle in oe: - var ne = events + {flagHandle} - if oe != ne: - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLRDHUP + var oe = s.fds[fdi].flags + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in oe) + var ne = events + {Event.flagHandle} + if oe != ne: + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLRDHUP - if eventRead in events: - epv.events = epv.events or EPOLLIN - if eventWrite in events: - epv.events = epv.events or EPOLLOUT + if Event.Read in events: + epv.events = epv.events or EPOLLIN + if Event.Write in events: + epv.events = epv.events or EPOLLOUT - if oe == {flagHandle}: - if ne != {flagHandle}: - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) - else: - if events != {}: - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - else: - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - dec(s.count) - s.fds[fdi].flags = ne + if oe == {Event.flagHandle}: + if ne != {Event.flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) else: - raise newException(ValueError, - "Could not update non-handle descriptor") - else: - raise newException(ValueError, - "Descriptor is not registered in queue") + if events != {}: + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + else: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + dec(s.count) + s.fds[fdi].flags = ne else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -894,21 +868,21 @@ else: if fdi.uint < s.maxFD: var flags = s.fds[fdi].flags if s.fds[fdi].ident != 0 and flags != {}: - if flagHandle in flags: + if Event.flagHandle in flags: # if events == {flagHandle}, then descriptor was already # unregistered from epoll with updateHandle() call. # This check is done to omit EBADF error. - if flags != {flagHandle}: + if flags != {Event.flagHandle}: if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: raiseOSError(osLastError()) dec(s.count) - elif flagTimer in flags: + elif Event.flagTimer in flags: if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: raiseOSError(osLastError()) discard posix.close(fdi.cint) dec(s.count) - elif flagSignal in flags: + elif Event.flagSignal in flags: if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: raiseOSError(osLastError()) var nmask: Sigset @@ -919,7 +893,7 @@ else: unblockSignals(nmask, omask) discard posix.close(fdi.cint) dec(s.count) - elif flagProcess in flags: + elif Event.flagProcess in flags: if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: raiseOSError(osLastError()) var nmask: Sigset @@ -935,7 +909,7 @@ else: proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.efd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0 and flagUser in s.fds[fdi].flags: + if s.fds[fdi].ident != 0 and (Event.flagUser in s.fds[fdi].flags): clearKey(s, fdi) var epv: epoll_event if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: @@ -951,33 +925,31 @@ else: if fdi == -1: raiseOSError(osLastError()) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - var flags = {flagTimer} - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - setNonBlocking(fdi.cint) - if oneshot: - new_ts.it_interval.tv_sec = 0.Time - new_ts.it_interval.tv_nsec = 0 - new_ts.it_value.tv_sec = (timeout div 1_000).Time - new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 - flags = flags + {flagOneshot} - epv.events = epv.events or EPOLLONESHOT - else: - new_ts.it_interval.tv_sec = (timeout div 1000).Time - new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 - new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec - new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec - if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: - raiseOSError(osLastError()) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, fdi, flags, 0, data) - inc(s.count) - result = fdi + doAssert(s.fds[fdi].ident == 0) + var flags = {Event.flagTimer} + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + setNonBlocking(fdi.cint) + if oneshot: + new_ts.it_interval.tv_sec = 0.Time + new_ts.it_interval.tv_nsec = 0 + new_ts.it_value.tv_sec = (timeout div 1_000).Time + new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 + flags = flags + {Event.flagOneshot} + epv.events = epv.events or EPOLLONESHOT else: - raise newException(ValueError, "Re-use of non-closed descriptor") + new_ts.it_interval.tv_sec = (timeout div 1000).Time + new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 + new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec + new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec + if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: + raiseOSError(osLastError()) + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, fdi, flags, 0, data) + inc(s.count) + result = fdi else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -995,18 +967,16 @@ else: if fdi == -1: raiseOSError(osLastError()) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setNonBlocking(fdi.cint) - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, signal, {flagSignal}, signal, data) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + setNonBlocking(fdi.cint) + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, signal, {Event.flagSignal}, signal, data) + inc(s.count) + result = fdi else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -1020,28 +990,22 @@ else: discard sigemptyset(omask) discard sigaddset(nmask, posix.SIGCHLD) blockSignals(nmask, omask) - try: - var fdi = signalfd(-1, nmask, 0) - if fd == -1: + var fdi = signalfd(-1, nmask, 0) + if fd == -1: + raiseOSError(osLastError()) + if fdi.uint < s.maxFD: + doAssert(s.fds[fdi].ident == 0) + setNonBlocking(fdi.cint) + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setNonBlocking(fdi.cint) - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, pid, {flagProcess}, pid, data) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Re-use of non-closed descriptor") - else: - raise newException(ValueError, "Maximum file descriptors exceeded") - except: - if fd != -1: discard posix.close(fd.cint) - unblockSignals(omask, nmask) + setKey(s, fdi, pid, {Event.flagProcess}, pid, data) + inc(s.count) + result = fdi + else: + raise newException(ValueError, "Maximum file descriptors exceeded") proc flush*[T](s: Selector[T]) = discard @@ -1051,18 +1015,13 @@ else: proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = int(ev.efd) - if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setKey(s, fdi, fdi, {flagUser}, 0, data) - var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) - epv.data.u64 = ev.efd.uint - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) - else: - raise newException(ValueError, "Re-use of non-closed descriptor") - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagUser}, 0, data) + var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) + epv.data.u64 = ev.efd.uint + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) proc setEvent*(ev: SelectEvent) = var data : uint64 = 1 @@ -1105,36 +1064,36 @@ else: if skey.ident != 0 and flags != {}: block processItem: if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0: - events = events + {eventError} + events = events + {Event.Error} if (pevents and EPOLLOUT) != 0: - events = events + {eventWrite} + events = events + {Event.Write} if (pevents and EPOLLIN) != 0: - if flagHandle in flags: - events = events + {eventRead} - elif flagTimer in flags: + if Event.flagHandle in flags: + events = events + {Event.Read} + elif Event.flagTimer in flags: var data: uint64 = 0 if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64): raiseOSError(osLastError()) - events = events + {eventTimer} - elif flagSignal in flags: + events = events + {Event.Timer} + elif Event.flagSignal in flags: var data: SignalFdInfo if posix.read(fdi.cint, addr data, sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): raiseOsError(osLastError()) - events = events + {eventSignal} - elif flagProcess in flags: + events = events + {Event.Signal} + elif Event.flagProcess in flags: var data: SignalFdInfo if posix.read(fdi.cint, addr data, sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): raiseOsError(osLastError()) if cast[int](data.ssi_pid) == skey.param: - events = events + {eventProcess} + events = events + {Event.Process} # we want to free resources for this event - flags = flags + {flagOneshot} + flags = flags + {Event.flagOneshot} else: break processItem - elif flagUser in flags: + elif Event.flagUser in flags: var data: uint = 0 if posix.read(fdi.cint, addr data, sizeof(uint)) != sizeof(uint): @@ -1145,7 +1104,7 @@ else: continue else: raiseOSError(err) - events = events + {eventUser} + events = events + {Event.User} else: raise newException(ValueError, "Unsupported epoll event in queue") @@ -1153,7 +1112,7 @@ else: results[k].events = events results[k].data = skey.key.data - if flagOneshot in flags: + if Event.flagOneshot in flags: var epv: epoll_event try: if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, @@ -1305,12 +1264,12 @@ else: events: set[Event]) = mixin withSelectLock s.withSelectLock(): - if eventRead in events: + if Event.Read in events: if s.rSet.fd_count == FD_SETSIZE: raise newException(ValueError, "Maximum numbers of fds exceeded") iFD_SET(fd, s.rSet) inc(s.count) - if eventWrite in events: + if Event.Write in events: if s.wSet.fd_count == FD_SETSIZE: raise newException(ValueError, "Maximum numbers of fds exceeded") iFD_SET(fd, s.wSet) @@ -1320,7 +1279,7 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - var flags = {flagHandle} + events + var flags = {Event.flagHandle} + events var nkey = SelectorKey[T](ident: fdi, flags: flags) nkey.key.fd = fdi nkey.key.data = data @@ -1333,21 +1292,21 @@ else: events: set[Event]) = s.withSelectLock(): withValue(s.fds, fd, skey) do: - if flagHandle in skey.flags: + if Event.flagHandle in skey.flags: var oe = skey.flags - var ne = events + {flagHandle} + var ne = events + {Event.flagHandle} if oe != ne: - if (eventRead in oe) and (eventRead notin ne): + if (Event.Read in oe) and (Event.Read notin ne): iFD_CLR(fd, s.rSet) dec(s.count) - if (eventWrite in oe) and (eventWrite notin ne): + if (Event.Write in oe) and (Event.Write notin ne): iFD_CLR(fd, s.wSet) iFD_CLR(fd, s.eSet) dec(s.count) - if (eventRead notin oe) and (eventRead in ne): + if (Event.Read notin oe) and (Event.Read in ne): iFD_SET(fd, s.rSet) inc(s.count) - if (eventWrite notin oe) and (eventWrite in ne): + if (Event.Write notin oe) and (Event.Write in ne): iFD_SET(fd, s.wSet) iFD_SET(fd, s.eSet) inc(s.count) @@ -1384,17 +1343,17 @@ else: proc unregister*[T](s: Selector[T], fd: SocketHandle) = s.withSelectLock(): s.fds.withValue(fd, skey) do: - if eventRead in skey.flags: + if Event.Read in skey.flags: iFD_CLR(fd, s.rSet) dec(s.count) - if eventWrite in skey.flags: + if Event.Write in skey.flags: iFD_CLR(fd, s.wSet) iFD_CLR(fd, s.eSet) dec(s.count) s.fds.del(fd) proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = - var flags = {flagUser, eventRead} + var flags = {Event.flagUser, Event.Read} var nkey = SelectorKey[T](ident: ev.rsock.int, flags: flags) nkey.key.fd = ev.rsock.int nkey.key.data = data @@ -1488,13 +1447,13 @@ else: while i < rset.fd_count: let fd = rset.fd_array[i] if iFD_ISSET(fd, rset): - var events = {eventRead} - if iFD_ISSET(fd, eset): events = events + {eventError} - if iFD_ISSET(fd, wset): events = events + {eventWrite} + var events = {Event.Read} + if iFD_ISSET(fd, eset): events = events + {Event.Error} + if iFD_ISSET(fd, wset): events = events + {Event.Write} s.fds.withValue(fd, skey) do: - if flagHandle in skey.flags: + if Event.flagHandle in skey.flags: skey.key.events = events - elif flagUser in skey.flags: + elif Event.flagUser in skey.flags: var data: int = 0 if winlean.recv(fd, cast[pointer](addr(data)), sizeof(int).cint, 0) != sizeof(int): @@ -1505,7 +1464,7 @@ else: # someone already consumed event data inc(i) continue - skey.key.events = {eventUser} + skey.key.events = {Event.User} results[rindex].fd = skey.key.fd results[rindex].data = skey.key.data results[rindex].events = skey.key.events @@ -1516,9 +1475,9 @@ else: while i < wset.fd_count: let fd = wset.fd_array[i] if iFD_ISSET(fd, wset): - var events = {eventWrite} + var events = {Event.Write} if not iFD_ISSET(fd, rset): - if iFD_ISSET(fd, eset): events = events + {eventError} + if iFD_ISSET(fd, eset): events = events + {Event.Error} s.fds.withValue(fd, skey) do: skey.key.events = events results[rindex].fd = skey.key.fd @@ -1545,35 +1504,23 @@ else: else: const MAX_POLL_RESULT_EVENTS = 64 - when hasThreadSupport: - type - SelectorImpl[T] = object - maxFD : uint - pollcnt: int - fds: ptr SharedArray[SelectorKey[T]] - pollfds: ptr SharedArray[TPollFd] - count: int - lock: Lock - else: - type - SelectorImpl[T] = object - maxFD : uint - pollcnt: int - fds: seq[SelectorKey[T]] - pollfds: seq[TPollFd] - count: int - - when hasThreadSupport: - type Selector*[T] = ptr SelectorImpl[T] - else: - type Selector*[T] = ref SelectorImpl[T] - type + SelectorImpl[T] = object + maxFD : uint + pollcnt: int + fds: ptr SharedArray[SelectorKey[T]] + pollfds: ptr SharedArray[TPollFd] + count: int + when hasThreadSupport: + lock: Lock + + Selector*[T] = ptr SelectorImpl[T] + SelectEventImpl = object rfd: cint wfd: cint - type SelectEvent* = ptr SelectEventImpl + SelectEvent* = ptr SelectEventImpl when hasThreadSupport: template withPollLock[T](s: Selector[T], body: untyped) = @@ -1590,29 +1537,25 @@ else: proc newSelector*[T](): Selector[T] = var maxFD = getMaxFds() + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.maxFD = maxFD.uint + result.fds = allocSharedArray[SelectorKey[T]](maxFD) + result.pollfds = allocSharedArray[TPollFd](maxFD) when hasThreadSupport: - result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.maxFD = maxFD.uint - result.fds = allocSharedArray[SelectorKey[T]](maxFD) - result.pollfds = allocSharedArray[TPollFd](maxFD) initLock(result.lock) - else: - result = Selector[T](maxFD: maxFD.uint) - result.fds = newSeq[SelectorKey[T]](maxFD) - result.pollfds = newSeq[TPollFd](maxFD) proc close*[T](s: Selector[T]) = when hasThreadSupport: deinitLock(s.lock) - deallocSharedArray(s.fds) - deallocSharedArray(s.pollfds) - deallocShared(cast[pointer](s)) + deallocSharedArray(s.fds) + deallocSharedArray(s.pollfds) + deallocShared(cast[pointer](s)) template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) = withPollLock(s): var pollev: cshort = 0 - if eventRead in events: pollev = pollev or POLLIN - if eventWrite in events: pollev = pollev or POLLOUT + if Event.Read in events: pollev = pollev or POLLIN + if Event.Write in events: pollev = pollev or POLLOUT s.pollfds[s.pollcnt].fd = cint(sock) s.pollfds[s.pollcnt].events = pollev inc(s.count) @@ -1622,8 +1565,8 @@ else: withPollLock(s): var i = 0 var pollev: cshort = 0 - if eventRead in events: pollev = pollev or POLLIN - if eventWrite in events: pollev = pollev or POLLOUT + if Event.Read in events: pollev = pollev or POLLIN + if Event.Write in events: pollev = pollev or POLLOUT while i < s.pollcnt: if s.pollfds[i].fd == sock: @@ -1658,11 +1601,9 @@ else: events: set[Event], data: T) = var fdi = int(fd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - setKey(s, fdi, fdi, {flagHandle} + events, 0, data) - s.pollAdd(fdi.cint, events) - else: - raise newException(ValueError, "Re-use of non-closed descriptor") + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) + s.pollAdd(fdi.cint, events) else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -1670,21 +1611,16 @@ else: events: set[Event]) = var fdi = int(fd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0: - var oe = s.fds[fdi].flags - if flagHandle in oe: - var ne = events + {flagHandle} - if ne != oe: - if events != {}: - s.pollUpdate(fd.cint, events) - else: - s.pollRemove(fd.cint) - s.fds[fdi].flags = ne + var oe = s.fds[fdi].flags + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in oe) + var ne = events + {Event.flagHandle} + if ne != oe: + if events != {}: + s.pollUpdate(fd.cint, events) else: - raise newException(ValueError, - "Could not update non-handle descriptor") - else: - raise newException(ValueError, "Re-use of non closed descriptor") + s.pollRemove(fd.cint) + s.fds[fdi].flags = ne else: raise newException(ValueError, "Maximum file descriptors exceeded") @@ -1702,15 +1638,10 @@ else: proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = var fdi = int(ev.rfd) - if fdi.uint < s.maxFD: - if s.fds[fdi].ident == 0: - var events = {flagUser, eventRead} - setKey(s, fdi, fdi, events, 0, data) - s.pollAdd(fdi.cint, events) - else: - raise newException(ValueError, "Re-use of non-closed descriptor") - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + doAssert(s.fds[fdi].ident == 0) + var events = {Event.flagUser, Event.Read} + setKey(s, fdi, fdi, events, 0, data) + s.pollAdd(fdi.cint, events) proc flush*[T](s: Selector[T]) = discard @@ -1727,7 +1658,7 @@ else: proc unregister*[T](s: Selector[T], ev: SelectEvent) = var fdi = int(ev.rfd) if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0 and (flagUser in s.fds[fdi].flags): + if s.fds[fdi].ident != 0 and (Event.flagUser in s.fds[fdi].flags): clearKey(s, fdi) s.pollRemove(fdi.cint) @@ -1769,16 +1700,16 @@ else: if revents != 0: var events: set[Event] = {} if (revents and POLLIN) != 0: - events = events + {eventRead} + events = events + {Event.Read} if (revents and POLLOUT) != 0: - events = events + {eventWrite} + events = events + {Event.Write} if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or (revents and POLLNVAL) != 0: - events = events + {eventError} + events = events + {Event.Error} var skey = addr(s.fds[fd]) - if flagUser in skey.flags: - if eventRead in events: + if Event.flagUser in skey.flags: + if Event.Read in events: var data: int = 0 if posix.read(fd, addr data, sizeof(int)) != sizeof(int): let err = osLastError() @@ -1788,7 +1719,7 @@ else: # someone already consumed event data inc(i) continue - events = {eventUser} + events = {Event.User} results[rindex].fd = fd results[rindex].events = events diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim index f3f38be7fe..ebfe10fd6e 100644 --- a/tests/async/tioselectors.nim +++ b/tests/async/tioselectors.nim @@ -38,8 +38,8 @@ when not defined(windows): var client_socket = create_test_socket() var server_socket = create_test_socket() - registerHandle(selector, server_socket, {eventRead}, 0) - registerHandle(selector, client_socket, {eventWrite}, 0) + registerHandle(selector, server_socket, {Event.Read}, 0) + registerHandle(selector, client_socket, {Event.Write}, 0) var option : int32 = 1 if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), @@ -67,13 +67,13 @@ when not defined(windows): cast[ptr SockAddr](addr(sockAddress)), addr(addrLen)) assert(server2_socket != osInvalidSocket) - selector.registerHandle(server2_socket, {eventRead}, 0) + selector.registerHandle(server2_socket, {Event.Read}, 0) if posix.send(client_socket, addr(client_message[0]), len(client_message), 0) == -1: raiseOSError(osLastError()) - selector.updateHandle(client_socket, {eventRead}) + selector.updateHandle(client_socket, {Event.Read}) var rc2 = selector.select(100) assert(len(rc2) == 1) @@ -90,13 +90,13 @@ when not defined(windows): break assert(test1) - selector.updateHandle(server2_socket, {eventWrite}) + selector.updateHandle(server2_socket, {Event.Write}) var rc3 = selector.select(0) assert(len(rc3) == 1) if posix.send(server2_socket, addr(server_message[0]), len(server_message), 0) == -1: raiseOSError(osLastError()) - selector.updateHandle(server2_socket, {eventRead}) + selector.updateHandle(server2_socket, {Event.Read}) var rc4 = selector.select(100) assert(len(rc4) == 1) @@ -238,7 +238,7 @@ when not defined(windows): var event = newEvent() for i in 0..high(thr): createThread(thr[i], event_wait_thread, event) - selector.registerHandle(sock, {eventRead}, 1) + selector.registerHandle(sock, {Event.Read}, 1) discard selector.select(500) selector.unregister(sock) event.setEvent() @@ -273,8 +273,8 @@ else: var client_socket = create_test_socket() var server_socket = create_test_socket() - selector.registerHandle(server_socket, {eventRead}, 0) - selector.registerHandle(client_socket, {eventWrite}, 0) + selector.registerHandle(server_socket, {Event.Read}, 0) + selector.registerHandle(client_socket, {Event.Write}, 0) var option : int32 = 1 if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), @@ -305,13 +305,13 @@ else: cast[ptr SockAddr](addr(sockAddress)), addr(addrLen)) assert(server2_socket != osInvalidSocket) - selector.registerHandle(server2_socket, {eventRead}, 0) + selector.registerHandle(server2_socket, {Event.Read}, 0) if send(client_socket, cast[pointer](addr(client_message[0])), cint(len(client_message)), 0) == -1: raiseOSError(osLastError()) - selector.updateHandle(client_socket, {eventRead}) + selector.updateHandle(client_socket, {Event.Read}) var rc2 = selector.select(100) assert(len(rc2) == 1) From fb0ef44864edd6d660c0de24c1943a830c461883 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sun, 26 Jun 2016 12:05:44 +0300 Subject: [PATCH 4/5] Some cosmetic changes, and comments --- lib/pure/ioselectors.nim | 408 +++++++++++++++++++-------------------- 1 file changed, 198 insertions(+), 210 deletions(-) diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim index 65e549b193..4ea7ee539e 100644 --- a/lib/pure/ioselectors.nim +++ b/lib/pure/ioselectors.nim @@ -50,7 +50,7 @@ when defined(nimdoc): Selector*[T] = ref object ## An object which holds descriptors to be checked for read/write status - Event* {.pure.} = enum + Event* {.pure.} = enum ## An enum which hold event types Read, ## Descriptor is available for read Write, ## Descriptor is available for write @@ -249,7 +249,7 @@ else: import locks type - Event* {.pure.} = enum + Event* {.pure.} = enum Read, Write, Timer, Signal, Process, Vnode, User, Error, flagHandle, flagTimer, flagSignal, flagProcess, flagVnode, flagUser, flagOneshot @@ -299,6 +299,10 @@ else: s.fds[f].ident = 0 s.fds[f].flags = {} + template checkMaxFd(s, fd) = + if fd.uint >= s.maxFD: + raise newException(ValueError, "Maximum file descriptors exceeded") + when supportedPlatform: template blockSignals(newmask: var Sigset, oldmask: var Sigset) = when hasThreadSupport: @@ -326,7 +330,9 @@ else: # when bsdPlatform: const + # Maximum number of cached changes MAX_KQUEUE_CHANGE_EVENTS = 64 + # Maximum number of events that can be returned MAX_KQUEUE_RESULT_EVENTS = 64 type @@ -401,43 +407,39 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) - if events != {}: - if Event.Read in events: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if Event.Write in events: - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) + if events != {}: + if Event.Read in events: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if Event.Write in events: + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident != 0) - doAssert(Event.flagHandle in s.fds[fdi].flags) - var ne = events + {Event.flagHandle} - var oe = s.fds[fdi].flags - if oe != ne: - if (Event.Read in oe) and (Event.Read notin ne): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) - if (Event.Write in oe) and (Event.Write notin ne): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) - dec(s.count) - if (Event.Read notin oe) and (Event.Read in ne): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if (Event.Write notin oe) and (Event.Write in ne): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in s.fds[fdi].flags) + var ne = events + {Event.flagHandle} + var oe = s.fds[fdi].flags + if oe != ne: + if (Event.Read in oe) and (Event.Read notin ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Write in oe) and (Event.Write notin ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Read notin oe) and (Event.Read in ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if (Event.Write notin oe) and (Event.Write in ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + s.fds[fdi].flags = ne proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = @@ -445,21 +447,19 @@ else: posix.IPPROTO_TCP).int if fdi == -1: raiseOsError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - var mflags = if oneshot: {Event.flagTimer, Event.flagOneshot} - else: {Event.flagTimer} - var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD - else: EV_ADD - setKey(s, fdi, fdi, mflags, 0, data) - # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, - # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds - # too - modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + var mflags = if oneshot: {Event.flagTimer, Event.flagOneshot} + else: {Event.flagTimer} + var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD + else: EV_ADD + setKey(s, fdi, fdi, mflags, 0, data) + # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, + # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds + # too + modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) + inc(s.count) + result = fdi proc registerSignal*[T](s: Selector[T], signal: int, data: T): int {.discardable.} = @@ -468,24 +468,22 @@ else: if fdi == -1: raiseOsError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, signal, {Event.flagSignal}, signal, data) - # block signal `signal` - var nmask: Sigset - var omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, cint(signal)) - blockSignals(nmask, omask) - # to be compatible with linux semantic we need to "eat" signals - posix.signal(cint(signal), SIG_IGN) - modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, - cast[pointer](fdi)) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, signal, {Event.flagSignal}, signal, data) + # block signal `signal` + var nmask: Sigset + var omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(signal)) + blockSignals(nmask, omask) + # to be compatible with linux semantic we need to "eat" signals + posix.signal(cint(signal), SIG_IGN) + modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi proc registerProcess*[T](s: Selector[T], pid: int, data: T): int {.discardable.} = @@ -494,16 +492,14 @@ else: if fdi == -1: raiseOsError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - var kflags: cushort = EV_ONESHOT or EV_ADD - setKey(s, fdi, pid, {Event.flagProcess, Event.flagOneshot}, pid, data) - modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, - cast[pointer](fdi)) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + var kflags: cushort = EV_ONESHOT or EV_ADD + setKey(s, fdi, pid, {Event.flagProcess, Event.flagOneshot}, pid, data) + modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = var fdi = int(fd) @@ -592,12 +588,11 @@ else: proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = ev.rfd.int - if fdi.uint < s.maxFD: - var flags = s.fds[fdi].flags - if s.fds[fdi].ident != 0 and flags != {}: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) - clearKey(s, fdi) + var flags = s.fds[fdi].flags + if s.fds[fdi].ident != 0 and flags != {}: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + clearKey(s, fdi) proc selectInto*[T](s: Selector[T], timeout: int, results: var openarray[ReadyKey[T]]): int = @@ -710,6 +705,7 @@ else: elif defined(linux): const + # Maximum number of events that can be returned MAX_EPOLL_RESULT_EVENTS = 64 type SignalFdInfo* {.importc: "struct signalfd_siginfo", @@ -772,7 +768,6 @@ else: proc eventfd(count: cuint, flags: cint): cint {.cdecl, importc: "eventfd", header: "".} - type SelectorImpl[T] = object epollFD : cint @@ -807,60 +802,56 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, events + {Event.flagHandle}, 0, data) - if events != {}: - var epv: epoll_event - epv.events = EPOLLRDHUP - epv.data.u64 = fdi.uint - if Event.Read in events: - epv.events = epv.events or EPOLLIN - if Event.Write in events: - epv.events = epv.events or EPOLLOUT - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, events + {Event.flagHandle}, 0, data) + if events != {}: + var epv: epoll_event + epv.events = EPOLLRDHUP + epv.data.u64 = fdi.uint + if Event.Read in events: + epv.events = epv.events or EPOLLIN + if Event.Write in events: + epv.events = epv.events or EPOLLOUT + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = var fdi = int(fd) - if fdi.uint < s.maxFD: - var oe = s.fds[fdi].flags - doAssert(s.fds[fdi].ident != 0) - doAssert(Event.flagHandle in oe) - var ne = events + {Event.flagHandle} - if oe != ne: - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLRDHUP + s.checkMaxFd(fdi) + var oe = s.fds[fdi].flags + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in oe) + var ne = events + {Event.flagHandle} + if oe != ne: + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLRDHUP - if Event.Read in events: - epv.events = epv.events or EPOLLIN - if Event.Write in events: - epv.events = epv.events or EPOLLOUT + if Event.Read in events: + epv.events = epv.events or EPOLLIN + if Event.Write in events: + epv.events = epv.events or EPOLLOUT - if oe == {Event.flagHandle}: - if ne != {Event.flagHandle}: - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) + if oe == {Event.flagHandle}: + if ne != {Event.flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) + else: + if ne != {Event.flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) else: - if events != {}: - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - else: - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - dec(s.count) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + dec(s.count) + s.fds[fdi].flags = ne proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = var epv: epoll_event @@ -924,88 +915,88 @@ else: var fdi = timerfd_create(CLOCK_MONOTONIC, 0) if fdi == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - var flags = {Event.flagTimer} - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - setNonBlocking(fdi.cint) - if oneshot: - new_ts.it_interval.tv_sec = 0.Time - new_ts.it_interval.tv_nsec = 0 - new_ts.it_value.tv_sec = (timeout div 1_000).Time - new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 - flags = flags + {Event.flagOneshot} - epv.events = epv.events or EPOLLONESHOT - else: - new_ts.it_interval.tv_sec = (timeout div 1000).Time - new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 - new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec - new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec - if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: - raiseOSError(osLastError()) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, fdi, flags, 0, data) - inc(s.count) - result = fdi + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + var flags = {Event.flagTimer} + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + setNonBlocking(fdi.cint) + if oneshot: + new_ts.it_interval.tv_sec = 0.Time + new_ts.it_interval.tv_nsec = 0 + new_ts.it_value.tv_sec = (timeout div 1_000).Time + new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 + flags = flags + {Event.flagOneshot} + epv.events = epv.events or EPOLLONESHOT else: - raise newException(ValueError, "Maximum file descriptors exceeded") + new_ts.it_interval.tv_sec = (timeout div 1000).Time + new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 + new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec + new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec + if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: + raiseOSError(osLastError()) + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, fdi, flags, 0, data) + inc(s.count) + result = fdi proc registerSignal*[T](s: Selector[T], signal: int, data: T): int {.discardable.} = var nmask: Sigset omask: Sigset - fdi: cint + discard sigemptyset(nmask) discard sigemptyset(omask) - discard sigaddset(nmask, signal.cint) + discard sigaddset(nmask, cint(signal)) blockSignals(nmask, omask) - fdi = signalfd(-1, nmask, 0) + + var fdi = signalfd(-1, nmask, 0).int if fdi == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setNonBlocking(fdi.cint) - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, signal, {Event.flagSignal}, signal, data) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setNonBlocking(fdi.cint) + + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, signal, {Event.flagSignal}, signal, data) + inc(s.count) + result = fdi proc registerProcess*[T](s: Selector, pid: int, data: T): int {.discardable.} = var nmask: Sigset omask: Sigset - fd: int + discard sigemptyset(nmask) discard sigemptyset(omask) discard sigaddset(nmask, posix.SIGCHLD) blockSignals(nmask, omask) - var fdi = signalfd(-1, nmask, 0) - if fd == -1: + + var fdi = signalfd(-1, nmask, 0).int + if fdi == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setNonBlocking(fdi.cint) - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, pid, {Event.flagProcess}, pid, data) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setNonBlocking(fdi.cint) + + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, pid, {Event.flagProcess}, pid, data) + inc(s.count) + result = fdi proc flush*[T](s: Selector[T]) = discard @@ -1502,6 +1493,7 @@ else: # else: + # Maximum number of events that can be returned const MAX_POLL_RESULT_EVENTS = 64 type @@ -1600,29 +1592,25 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) - s.pollAdd(fdi.cint, events) - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) + s.pollAdd(fdi.cint, events) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = var fdi = int(fd) - if fdi.uint < s.maxFD: - var oe = s.fds[fdi].flags - doAssert(s.fds[fdi].ident != 0) - doAssert(Event.flagHandle in oe) - var ne = events + {Event.flagHandle} - if ne != oe: - if events != {}: - s.pollUpdate(fd.cint, events) - else: - s.pollRemove(fd.cint) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + var oe = s.fds[fdi].flags + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in oe) + var ne = events + {Event.flagHandle} + if ne != oe: + if events != {}: + s.pollUpdate(fd.cint, events) + else: + s.pollRemove(fd.cint) + s.fds[fdi].flags = ne proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = From b8151e09f428a8f2b44b2bedb362bfc6a4510a77 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sun, 26 Jun 2016 12:14:21 +0300 Subject: [PATCH 5/5] one more cosmetic change --- lib/pure/ioselectors.nim | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim index 4ea7ee539e..034b182ab6 100644 --- a/lib/pure/ioselectors.nim +++ b/lib/pure/ioselectors.nim @@ -1735,24 +1735,20 @@ else: template withData*[T](s: Selector[T], fd: SocketHandle, value, body: untyped) = var fdi = int(fd) - if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) - body - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + if s.fds[fdi].ident != 0: + var value = addr(s.fds[fdi].key.data) + body template withData*[T](s: Selector[T], fd: SocketHandle, value, body1, body2: untyped) = var fdi = int(fd) - if fdi.uint < s.maxFD: - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) - body1 - else: - body2 + s.checkMaxFd(fdi) + if s.fds[fdi].ident != 0: + var value = addr(s.fds[fdi].key.data) + body1 else: - raise newException(ValueError, "Maximum file descriptors exceeded") + body2 else: template withData*(s: Selector, fd: SocketHandle, value, body: untyped) = s.fds.withValue(fd, skey) do: