ioselectors separated and refactored version.

adopted asyncdispatch version
This commit is contained in:
cheatfate
2016-07-05 13:18:26 +03:00
parent 5f9da6b2ae
commit 835ff4a2f8
8 changed files with 4030 additions and 1768 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,261 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module allows high-level and efficient I/O multiplexing.
##
## Supported OS primitives: ``epoll``, ``kqueue``, ``poll`` and
## Windows ``select``.
##
## To use threadsafe version of this module, it needs to be compiled
## with both ``-d:threadsafe`` and ``--threads:on`` options.
##
## Supported features: files, sockets, pipes, timers, processes, signals
## and user events.
##
## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux.
##
## Partially supported OS: Windows (only sockets and user events),
## Solaris (files, sockets, handles and user events).
##
## TODO: ``/dev/poll``, ``event ports`` and filesystem events.
import os
const hasThreadSupport = compileOption("threads") and defined(threadsafe)
const supportedPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(linux)
const bsdPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd)
when defined(nimdoc):
type
Selector*[T] = ref object
## An object which holds descriptors to be checked for read/write status
Event* {.pure.} = enum
## An enum which hold event types
Read, ## Descriptor is available for read
Write, ## Descriptor is available for write
Timer, ## Timer descriptor is completed
Signal, ## Signal is raised
Process, ## Process is finished
Vnode, ## Currently not supported
User, ## User event is raised
Error ## Error happens while waiting, for descriptor
ReadyKey*[T] = object
## An object which holds result for descriptor
fd* : int ## file/socket descriptor
events*: set[Event] ## set of events
data*: T ## application-defined data
SelectEvent* = object
## An object which holds user defined event
proc newSelector*[T](): Selector[T] =
## Creates a new selector
proc close*[T](s: Selector[T]) =
## Closes selector
proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event],
data: T) =
## Registers file/socket descriptor ``fd`` to selector ``s``
## with events set in ``events``. The ``data`` is application-defined
## data, which to be passed when event happens.
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
## Update file/socket descriptor ``fd``, registered in selector
## ``s`` with new events set ``event``.
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
## Registers timer notification with ``timeout`` in milliseconds
## to selector ``s``.
## If ``oneshot`` is ``true`` timer will be notified only once.
## Set ``oneshot`` to ``false`` if your want periodic notifications.
## The ``data`` is application-defined data, which to be passed, when
## time limit expired.
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
## Registers Unix signal notification with ``signal`` to selector
## ``s``. The ``data`` is application-defined data, which to be
## passed, when signal raises.
##
## This function is not supported for ``Windows``.
proc registerProcess*[T](s: Selector[T], pid: int,
data: T): int {.discardable.} =
## Registers process id (pid) notification when process has
## exited to selector ``s``.
## The ``data`` is application-defined data, which to be passed, when
## process with ``pid`` has exited.
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
## Registers selector event ``ev`` to selector ``s``.
## ``data`` application-defined data, which to be passed, when
## ``ev`` happens.
proc newSelectEvent*(): SelectEvent =
## Creates new event ``SelectEvent``.
proc setEvent*(ev: SelectEvent) =
## Trigger event ``ev``.
proc close*(ev: SelectEvent) =
## Closes selector event ``ev``.
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
## Unregisters event ``ev`` from selector ``s``.
proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) =
## Unregisters file/socket descriptor ``fd`` from selector ``s``.
proc flush*[T](s: Selector[T]) =
## Flushes all changes was made to kernel pool/queue.
## This function is usefull only for BSD and MacOS, because
## kqueue supports bulk changes to be made.
## On Linux/Windows and other Posix compatible operation systems,
## ``flush`` is alias for `discard`.
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
## Process call waiting for events registered in selector ``s``.
## The ``timeout`` argument specifies the minimum number of milliseconds
## the function will be blocked, if no events are not ready. Specifying a
## timeout of ``-1`` causes function to block indefinitely.
## All available events will be stored in ``results`` array.
##
## Function returns number of triggered events.
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
## Process call waiting for events registered in selector ``s``.
## The ``timeout`` argument specifies the minimum number of milliseconds
## the function will be blocked, if no events are not ready. Specifying a
## timeout of -1 causes function to block indefinitely.
##
## Function returns sequence of triggered events.
template isEmpty*[T](s: Selector[T]): bool =
## Returns ``true``, if there no registered events or descriptors
## in selector.
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body: untyped) =
## retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
## .. code-block:: nim
##
## s.withData(fd, value) do:
## # block is executed only if ``fd`` registered in selector ``s``
## value.uid = 1000
##
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body1, body2: untyped) =
## retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
## .. code-block:: nim
##
## s.withData(fd, value) do:
## # block is executed only if ``fd`` registered in selector ``s``.
## value.uid = 1000
## do:
## # block is executed if ``fd`` not registered in selector ``s``.
## raise
##
else:
when hasThreadSupport:
import locks
type
SharedArrayHolder[T] = object
part: array[1, T]
SharedArray {.unchecked.}[T] = array[0..100_000_000, T]
proc allocSharedArray[T](nsize: int): ptr SharedArray[T] =
let holder = cast[ptr SharedArrayHolder[T]](
allocShared0(sizeof(T) * nsize)
)
result = cast[ptr SharedArray[T]](addr(holder.part[0]))
proc deallocSharedArray[T](sa: ptr SharedArray[T]) =
deallocShared(cast[pointer](sa))
type
Event* {.pure.} = enum
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot
ReadyKey*[T] = object
fd* : int
events*: set[Event]
data*: T
SelectorKey[T] = object
ident: int
events: set[Event]
param: int
key: ReadyKey[T]
when not defined(windows):
import posix
proc setNonBlocking(fd: cint) {.inline.} =
var x = fcntl(fd, F_GETFL, 0)
if x == -1:
raiseOSError(osLastError())
else:
var mode = x or O_NONBLOCK
if fcntl(fd, F_SETFL, mode) == -1:
raiseOSError(osLastError())
template setKey(s, pident, pkeyfd, pevents, pparam, pdata) =
var skey = addr(s.fds[pident])
skey.ident = pident
skey.events = pevents
skey.param = pparam
skey.key.fd = pkeyfd
skey.key.data = pdata
when supportedPlatform:
template blockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
else:
if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
template unblockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
else:
if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
when defined(linux):
include ioselectors_epoll
elif bsdPlatform:
include ioselectors_kqueue
elif defined(windows):
include ioselectors_select
elif defined(solaris):
include ioselectors_poll # need to replace it with event ports
else:
include ioselectors_poll

View File

@@ -0,0 +1,461 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This module implements Linux epoll().
import posix, times
# Maximum number of events that can be returned
const MAX_EPOLL_RESULT_EVENTS = 64
type
SignalFdInfo* {.importc: "struct signalfd_siginfo",
header: "<sys/signalfd.h>", pure, final.} = object
ssi_signo*: uint32
ssi_errno*: int32
ssi_code*: int32
ssi_pid*: uint32
ssi_uid*: uint32
ssi_fd*: int32
ssi_tid*: uint32
ssi_band*: uint32
ssi_overrun*: uint32
ssi_trapno*: uint32
ssi_status*: int32
ssi_int*: int32
ssi_ptr*: uint64
ssi_utime*: uint64
ssi_stime*: uint64
ssi_addr*: uint64
pad* {.importc: "__pad".}: array[0..47, uint8]
eventFdData {.importc: "eventfd_t",
header: "<sys/eventfd.h>", pure, final.} = uint64
epoll_data {.importc: "union epoll_data", header: "<sys/epoll.h>",
pure, final.} = object
u64 {.importc: "u64".}: uint64
epoll_event {.importc: "struct epoll_event",
header: "<sys/epoll.h>", pure, final.} = object
events: uint32 # Epoll events
data: epoll_data # User data variable
const
EPOLL_CTL_ADD = 1 # Add a file descriptor to the interface.
EPOLL_CTL_DEL = 2 # Remove a file descriptor from the interface.
EPOLL_CTL_MOD = 3 # Change file descriptor epoll_event structure.
EPOLLIN = 0x00000001
EPOLLOUT = 0x00000004
EPOLLERR = 0x00000008
EPOLLHUP = 0x00000010
EPOLLRDHUP = 0x00002000
EPOLLONESHOT = 1 shl 30
proc epoll_create(size: cint): cint
{.importc: "epoll_create", header: "<sys/epoll.h>".}
proc epoll_ctl(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint
{.importc: "epoll_ctl", header: "<sys/epoll.h>".}
proc epoll_wait(epfd: cint; events: ptr epoll_event; maxevents: cint;
timeout: cint): cint
{.importc: "epoll_wait", header: "<sys/epoll.h>".}
proc timerfd_create(clock_id: ClockId, flags: cint): cint
{.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
proc timerfd_settime(ufd: cint, flags: cint,
utmr: var Itimerspec, otmr: var Itimerspec): cint
{.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
{.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
proc eventfd(count: cuint, flags: cint): cint
{.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
proc ulimit(cmd: cint): clong
{.importc: "ulimit", header: "<ulimit.h>", varargs.}
when hasThreadSupport:
type
SelectorImpl[T] = object
epollFD : cint
maxFD : int
fds: ptr SharedArray[SelectorKey[T]]
count: int
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
epollFD : cint
maxFD : int
fds: seq[SelectorKey[T]]
count: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
efd: cint
SelectEvent* = ptr SelectEventImpl
proc newSelector*[T](): Selector[T] =
var maxFD = int(ulimit(4, 0))
doAssert(maxFD > 0)
var epollFD = epoll_create(MAX_EPOLL_RESULT_EVENTS)
if epollFD < 0:
raiseOsError(osLastError())
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.epollFD = epollFD
result.maxFD = maxFD
result.fds = allocSharedArray[SelectorKey[T]](maxFD)
else:
result = Selector[T]()
result.epollFD = epollFD
result.maxFD = maxFD
result.fds = newSeq[SelectorKey[T]](maxFD)
proc close*[T](s: Selector[T]) =
if posix.close(s.epollFD) != 0:
raiseOSError(osLastError())
when hasThreadSupport:
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
proc newSelectEvent*(): SelectEvent =
let fdci = eventfd(0, 0)
if fdci == -1:
raiseOSError(osLastError())
setNonBlocking(fdci)
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.efd = fdci
proc setEvent*(ev: SelectEvent) =
var data : uint64 = 1
if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
raiseOSError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(ev.efd)
deallocShared(cast[pointer](ev))
template checkFd(s, f) =
if f >= s.maxFD:
raise newException(ValueError, "Maximum file descriptors exceeded")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, events, 0, data)
if events != {}:
var epv = epoll_event(events: EPOLLRDHUP)
epv.data.u64 = fdi.uint
if Event.Read in events: epv.events = epv.events or EPOLLIN
if Event.Write in events: epv.events = epv.events or EPOLLOUT
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
var epv = epoll_event(events: EPOLLRDHUP)
epv.data.u64 = fdi.uint
if Event.Read in events: epv.events = epv.events or EPOLLIN
if Event.Write in events: epv.events = epv.events or EPOLLOUT
if pkey.events == {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
inc(s.count)
else:
if events != {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
else:
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
dec(s.count)
pkey.events = events
proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
if pkey.events != {}:
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
dec(s.count)
elif Event.Signal in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(s.fds[fdi].param))
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
elif Event.Process in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, SIGCHLD)
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
pkey.ident = 0
pkey.events = {}
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.efd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.events = {}
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
dec(s.count)
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
var
new_ts: Itimerspec
old_ts: Itimerspec
let fdi = timerfd_create(CLOCK_MONOTONIC, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var events = {Event.Timer}
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if oneshot:
new_ts.it_interval.tv_sec = 0.Time
new_ts.it_interval.tv_nsec = 0
new_ts.it_value.tv_sec = (timeout div 1_000).Time
new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000
incl(events, Event.Oneshot)
epv.events = epv.events or EPOLLONESHOT
else:
new_ts.it_interval.tv_sec = (timeout div 1000).Time
new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000
new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec
new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec
if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1:
raiseOSError(osLastError())
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, fdi, events, 0, data)
inc(s.count)
result = fdi
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
var
nmask: Sigset
omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(signal))
blockSignals(nmask, omask)
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, signal, {Event.Signal}, signal, data)
inc(s.count)
result = fdi
proc registerProcess*[T](s: Selector, pid: int,
data: T): int {.discardable.} =
var
nmask: Sigset
omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, posix.SIGCHLD)
blockSignals(nmask, omask)
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
epv.events = EPOLLIN or EPOLLRDHUP
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, pid, {Event.Process, Event.Oneshot}, pid, data)
inc(s.count)
result = fdi
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = int(ev.efd)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, {Event.User}, 0, data)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = ev.efd.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) == -1:
raiseOSError(osLastError())
inc(s.count)
proc flush*[T](s: Selector[T]) =
discard
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
var
resTable: array[MAX_EPOLL_RESULT_EVENTS, epoll_event]
maxres = MAX_EPOLL_RESULT_EVENTS
events: set[Event] = {}
i, k: int
if maxres > len(results):
maxres = len(results)
let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint,
timeout.cint)
if count < 0:
result = 0
let err = osLastError()
if cint(err) != EINTR:
raiseOSError(err)
elif count == 0:
result = 0
else:
i = 0
k = 0
while i < count:
let fdi = int(resTable[i].data.u64)
let pevents = resTable[i].events
var skey = addr(s.fds[fdi])
doAssert(skey.ident != 0)
events = {}
if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
events.incl(Event.Error)
if (pevents and EPOLLOUT) != 0:
events.incl(Event.Write)
if (pevents and EPOLLIN) != 0:
if Event.Read in skey.events:
events.incl(Event.Read)
elif Event.Timer in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
events = {Event.Timer}
elif Event.Signal in skey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
events = {Event.Signal}
elif Event.Process in skey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
if cast[int](data.ssi_pid) == skey.param:
events = {Event.Process}
else:
inc(i)
continue
elif Event.User in skey.events:
var data: uint = 0
if posix.read(fdi.cint, addr data, sizeof(uint)) != sizeof(uint):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
inc(i)
continue
else:
raiseOSError(err)
events = {Event.User}
skey.key.events = events
results[k] = skey.key
inc(k)
if Event.Oneshot in skey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
skey.ident = 0
skey.events = {}
dec(s.count)
inc(i)
result = k
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](MAX_EPOLL_RESULT_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
body
template withData*[T](s: Selector[T], fd: SocketHandle, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
body1
else:
body2

View File

@@ -0,0 +1,439 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This module implements BSD kqueue().
import posix, times, kqueue
const
# Maximum number of cached changes.
MAX_KQUEUE_CHANGE_EVENTS = 64
# Maximum number of events that can be returned.
MAX_KQUEUE_RESULT_EVENTS = 64
when defined(macosx) or defined(freebsd):
when defined(macosx):
const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS)
else:
const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD)
proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr int,
newp: pointer, newplen: int): cint
{.importc: "sysctl",header: """#include <sys/types.h>
#include <sys/sysctl.h>"""}
elif defined(netbsd) or defined(openbsd):
# OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
# KERN_MAXFILES, because KERN_MAXFILES is always bigger,
# than KERN_MAXFILESPERPROC.
const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES
proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr int,
newp: pointer, newplen: int): cint
{.importc: "sysctl",header: """#include <sys/param.h>
#include <sys/sysctl.h>"""}
when hasThreadSupport:
type
SelectorImpl[T] = object
kqFD : cint
maxFD : int
changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent]
changesCount: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
changesLock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
kqFD : cint
maxFD : int
changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent]
changesCount: int
fds: seq[SelectorKey[T]]
count: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
rfd: cint
wfd: cint
# SelectEvent is declared as `ptr` to be placed in `shared memory`,
# so you can share one SelectEvent handle between threads.
type SelectEvent* = ptr SelectEventImpl
proc newSelector*[T](): Selector[T] =
var maxFD = 0.cint
var size = sizeof(cint)
var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint]
# Obtain maximum number of file descriptors for process
if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size,
nil, 0) != 0:
raiseOsError(osLastError())
var kqFD = kqueue()
if kqFD < 0:
raiseOsError(osLastError())
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.kqFD = kqFD
result.maxFD = maxFD.int
result.fds = allocSharedArray[SelectorKey[T]](maxFD)
initLock(result.changesLock)
else:
result = Selector[T]()
result.kqFD = kqFD
result.maxFD = maxFD.int
result.fds = newSeq[SelectorKey[T]](maxFD)
proc close*[T](s: Selector[T]) =
if posix.close(s.kqFD) != 0:
raiseOSError(osLastError())
when hasThreadSupport:
deinitLock(s.changesLock)
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rfd = fds[0]
result.wfd = fds[1]
proc setEvent*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(cint(ev.rfd))
discard posix.close(cint(ev.wfd))
deallocShared(cast[pointer](ev))
template checkFd(s, f) =
if f >= s.maxFD:
raise newException(ValueError, "Maximum file descriptors exceeded")
when hasThreadSupport:
template withChangeLock[T](s: Selector[T], body: untyped) =
acquire(s.changesLock)
{.locks: [s.changesLock].}:
try:
body
finally:
release(s.changesLock)
else:
template withChangeLock(s, body: untyped) =
body
template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
nflags: cushort, nfflags: cuint, ndata: int,
nudata: pointer) =
mixin withChangeLock
s.withChangeLock():
s.changesTable[s.changesCount] = KEvent(ident: nident,
filter: nfilter, flags: nflags,
fflags: nfflags, data: ndata,
udata: nudata)
inc(s.changesCount)
if s.changesCount == MAX_KQUEUE_CHANGE_EVENTS:
if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount),
nil, 0, nil) == -1:
raiseOSError(osLastError())
s.changesCount = 0
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, events, 0, data)
if events != {}:
if Event.Read in events:
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
inc(s.count)
if Event.Write in events:
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
if (Event.Read in pkey.events) and (Event.Read notin events):
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
dec(s.count)
if (Event.Write in pkey.events) and (Event.Write notin events):
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil)
dec(s.count)
if (Event.Read notin pkey.events) and (Event.Read in events):
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
inc(s.count)
if (Event.Write notin pkey.events) and (Event.Write in events):
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
inc(s.count)
pkey.events = events
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).int
if fdi == -1:
raiseOsError(osLastError())
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer}
let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD
s.setKey(fdi, fdi, events, 0, data)
# EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds,
# but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds
# too
modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil)
inc(s.count)
result = fdi
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).int
if fdi == -1:
raiseOsError(osLastError())
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, signal, {Event.Signal}, signal, data)
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(signal))
blockSignals(nmask, omask)
# to be compatible with linux semantic we need to "eat" signals
posix.signal(cint(signal), SIG_IGN)
modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0,
cast[pointer](fdi))
inc(s.count)
result = fdi
proc registerProcess*[T](s: Selector[T], pid: int,
data: T): int {.discardable.} =
var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).int
if fdi == -1:
raiseOsError(osLastError())
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var kflags: cushort = EV_ONESHOT or EV_ADD
setKey(s, fdi, pid, {Event.Process, Event.Oneshot}, pid, data)
modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0,
cast[pointer](fdi))
inc(s.count)
result = fdi
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = ev.rfd.int
doAssert(s.fds[fdi].ident == 0)
setKey(s, fdi, fdi, {Event.User}, 0, data)
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
inc(s.count)
proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
if pkey.events != {}:
if pkey.events * {Event.Read, Event.Write} != {}:
if Event.Read in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
dec(s.count)
if Event.Write in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil)
dec(s.count)
elif Event.Timer in pkey.events:
discard posix.close(cint(pkey.key.fd))
modifyKQueue(s, fdi.uint, EVFILT_TIMER, EV_DELETE, 0, 0, nil)
dec(s.count)
elif Event.Signal in pkey.events:
var nmask, omask: Sigset
var signal = cint(pkey.param)
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, signal)
unblockSignals(nmask, omask)
posix.signal(signal, SIG_DFL)
discard posix.close(cint(pkey.key.fd))
modifyKQueue(s, fdi.uint, EVFILT_SIGNAL, EV_DELETE, 0, 0, nil)
dec(s.count)
elif Event.Process in pkey.events:
discard posix.close(cint(pkey.key.fd))
modifyKQueue(s, fdi.uint, EVFILT_PROC, EV_DELETE, 0, 0, nil)
dec(s.count)
elif Event.User in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
dec(s.count)
pkey.ident = 0
pkey.events = {}
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.events = {}
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
dec(s.count)
proc flush*[T](s: Selector[T]) =
s.withChangeLock():
var tv = Timespec()
if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount),
nil, 0, addr tv) == -1:
raiseOSError(osLastError())
s.changesCount = 0
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
var
tv: Timespec
resTable: array[MAX_KQUEUE_RESULT_EVENTS, KEvent]
ptv = addr tv
maxres = MAX_KQUEUE_RESULT_EVENTS
if timeout != -1:
if timeout >= 1000:
tv.tv_sec = (timeout div 1_000).Time
tv.tv_nsec = (timeout %% 1_000) * 1_000_000
else:
tv.tv_sec = 0.Time
tv.tv_nsec = timeout * 1_000_000
else:
ptv = nil
if maxres > len(results):
maxres = len(results)
var count = 0
s.withChangeLock():
count = kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount),
addr(resTable[0]), cint(maxres), ptv)
s.changesCount = 0
if count < 0:
result = 0
let err = osLastError()
if cint(err) != EINTR:
raiseOSError(err)
elif count == 0:
result = 0
else:
var i = 0
var k = 0
var pkey: ptr SelectorKey[T]
while i < count:
let kevent = addr(resTable[i])
if (kevent.flags and EV_ERROR) == 0:
case kevent.filter:
of EVFILT_READ:
pkey = addr(s.fds[kevent.ident.int])
pkey.key.events = {Event.Read}
if Event.User in pkey.events:
var data: uint64 = 0
if posix.read(kevent.ident.cint, addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
# someone already consumed event data
inc(i)
continue
else:
raiseOSError(osLastError())
pkey.key.events = {Event.User}
of EVFILT_WRITE:
pkey = addr(s.fds[kevent.ident.int])
pkey.key.events = {Event.Write}
of EVFILT_TIMER:
pkey = addr(s.fds[kevent.ident.int])
if Event.Oneshot in pkey.events:
if posix.close(cint(pkey.ident)) == -1:
raiseOSError(osLastError())
pkey.ident = 0
pkey.events = {}
dec(s.count)
pkey.key.events = {Event.Timer}
of EVFILT_VNODE:
pkey = addr(s.fds[kevent.ident.int])
pkey.key.events = {Event.Vnode}
of EVFILT_SIGNAL:
pkey = addr(s.fds[cast[int](kevent.udata)])
pkey.key.events = {Event.Signal}
of EVFILT_PROC:
pkey = addr(s.fds[cast[int](kevent.udata)])
if posix.close(cint(pkey.ident)) == -1:
raiseOSError(osLastError())
pkey.ident = 0
pkey.events = {}
dec(s.count)
pkey.key.events = {Event.Process}
else:
raise newException(ValueError, "Unsupported kqueue filter in queue")
if (kevent.flags and EV_EOF) != 0:
pkey.key.events.incl(Event.Error)
results[k] = pkey.key
inc(k)
inc(i)
result = k
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](MAX_KQUEUE_RESULT_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
body
template withData*[T](s: Selector[T], fd: SocketHandle, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
body1
else:
body2

View File

@@ -0,0 +1,295 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This module implements Posix poll().
import posix, times
# Maximum number of events that can be returned
const MAX_POLL_RESULT_EVENTS = 64
when hasThreadSupport:
type
SelectorImpl[T] = object
maxFD : int
pollcnt: int
fds: ptr SharedArray[SelectorKey[T]]
pollfds: ptr SharedArray[TPollFd]
count: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
maxFD : int
pollcnt: int
fds: seq[SelectorKey[T]]
pollfds: seq[TPollFd]
count: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
rfd: cint
wfd: cint
SelectEvent* = ptr SelectEventImpl
var RLIMIT_NOFILE {.importc: "RLIMIT_NOFILE",
header: "<sys/resource.h>".}: cint
type
rlimit {.importc: "struct rlimit",
header: "<sys/resource.h>", pure, final.} = object
rlim_cur: int
rlim_max: int
proc getrlimit(resource: cint, rlp: var rlimit): cint
{.importc: "getrlimit",header: "<sys/resource.h>".}
when hasThreadSupport:
template withPollLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
{.locks: [s.lock].}:
try:
body
finally:
release(s.lock)
else:
template withPollLock(s, body: untyped) =
body
proc newSelector*[T](): Selector[T] =
var a = rlimit()
if getrlimit(RLIMIT_NOFILE, a) != 0:
raiseOsError(osLastError())
var maxFD = int(a.rlim_max)
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.maxFD = maxFD
result.fds = allocSharedArray[SelectorKey[T]](maxFD)
result.pollfds = allocSharedArray[TPollFd](maxFD)
initLock(result.lock)
else:
result = Selector[T]()
result.maxFD = maxFD
result.fds = newSeq[SelectorKey[T]](maxFD)
result.pollfds = newSeq[TPollFd](maxFD)
proc close*[T](s: Selector[T]) =
when hasThreadSupport:
deinitLock(s.lock)
deallocSharedArray(s.fds)
deallocSharedArray(s.pollfds)
deallocShared(cast[pointer](s))
template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) =
withPollLock(s):
var pollev: cshort = 0
if Event.Read in events: pollev = pollev or POLLIN
if Event.Write in events: pollev = pollev or POLLOUT
s.pollfds[s.pollcnt].fd = cint(sock)
s.pollfds[s.pollcnt].events = pollev
inc(s.count)
inc(s.pollcnt)
template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
withPollLock(s):
var i = 0
var pollev: cshort = 0
if Event.Read in events: pollev = pollev or POLLIN
if Event.Write in events: pollev = pollev or POLLOUT
while i < s.pollcnt:
if s.pollfds[i].fd == sock:
s.pollfds[i].events = pollev
break
inc(i)
if i == s.pollcnt:
raise newException(ValueError, "Descriptor is not registered in queue")
template pollRemove[T](s: Selector[T], sock: cint) =
withPollLock(s):
var i = 0
while i < s.pollcnt:
if s.pollfds[i].fd == sock:
if i == s.pollcnt - 1:
s.pollfds[i].fd = 0
s.pollfds[i].events = 0
s.pollfds[i].revents = 0
else:
while i < (s.pollcnt - 1):
s.pollfds[i].fd = s.pollfds[i + 1].fd
s.pollfds[i].events = s.pollfds[i + 1].events
inc(i)
break
inc(i)
dec(s.pollcnt)
dec(s.count)
template checkFd(s, f) =
if f >= s.maxFD:
raise newException(ValueError, "Maximum file descriptors exceeded")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
var fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, events, 0, data)
if events != {}: s.pollAdd(fdi.cint, events)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
if pkey.events == {}:
s.pollAdd(fd.cint, events)
else:
if events != {}:
s.pollUpdate(fd.cint, events)
else:
s.pollRemove(fd.cint)
pkey.events = events
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
var fdi = int(ev.rfd)
doAssert(s.fds[fdi].ident == 0)
var events = {Event.User}
setKey(s, fdi, fdi, events, 0, data)
events.incl(Event.Read)
s.pollAdd(fdi.cint, events)
proc flush*[T](s: Selector[T]) = discard
proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
pkey.ident = 0
pkey.events = {}
s.pollRemove(fdi.cint)
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.events = {}
s.pollRemove(fdi.cint)
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rfd = fds[0]
result.wfd = fds[1]
proc setEvent*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(cint(ev.rfd))
discard posix.close(cint(ev.wfd))
deallocShared(cast[pointer](ev))
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
var maxres = MAX_POLL_RESULT_EVENTS
if maxres > len(results):
maxres = len(results)
s.withPollLock():
let count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout)
if count < 0:
result = 0
let err = osLastError()
if err.cint == EINTR:
discard
else:
raiseOSError(osLastError())
elif count == 0:
result = 0
else:
var i = 0
var k = 0
var rindex = 0
while (i < s.pollcnt) and (k < count) and (rindex < maxres):
let revents = s.pollfds[i].revents
if revents != 0:
let fd = s.pollfds[i].fd
var skey = addr(s.fds[fd])
skey.key.events = {}
if (revents and POLLIN) != 0:
skey.key.events.incl(Event.Read)
if Event.User in skey.events:
var data: uint64 = 0
if posix.read(fd, addr data, sizeof(int)) != sizeof(int):
let err = osLastError()
if err != OSErrorCode(EAGAIN):
raiseOSError(osLastError())
else:
# someone already consumed event data
inc(i)
continue
skey.key.events = {Event.User}
if (revents and POLLOUT) != 0:
skey.key.events.incl(Event.Write)
if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or
(revents and POLLNVAL) != 0:
skey.key.events.incl(Event.Error)
results[rindex] = skey.key
s.pollfds[i].revents = 0
inc(rindex)
inc(k)
inc(i)
result = k
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](MAX_POLL_RESULT_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
body
template withData*[T](s: Selector[T], fd: SocketHandle, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
body1
else:
body2

View File

@@ -0,0 +1,416 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2016 Eugene Kabanov
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# This module implements Posix and Windows select().
import times, nativesockets
when defined(windows):
import winlean
when defined(gcc):
{.passL: "-lws2_32".}
elif defined(vcc):
{.passL: "ws2_32.lib".}
const platformHeaders = """#include <winsock2.h>
#include <windows.h>"""
const EAGAIN = WSAEWOULDBLOCK
else:
const platformHeaders = """#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>"""
type
Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object
var
FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint
proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset)
{.cdecl, importc: "FD_SET", header: platformHeaders, inline.}
proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset)
{.cdecl, importc: "FD_CLR", header: platformHeaders, inline.}
proc IOFD_ZERO(fdset: ptr Fdset)
{.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.}
when defined(windows):
proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
{.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.}
proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
timeout: ptr Timeval): cint
{.stdcall, importc: "select", header: platformHeaders.}
else:
proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint
{.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.}
proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset,
timeout: ptr Timeval): cint
{.cdecl, importc: "select", header: platformHeaders.}
when hasThreadSupport:
type
SelectorImpl[T] = object
rSet: FdSet
wSet: FdSet
eSet: FdSet
maxFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
rSet: FdSet
wSet: FdSet
eSet: FdSet
maxFD: int
fds: seq[SelectorKey[T]]
count: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
rsock: SocketHandle
wsock: SocketHandle
SelectEvent* = ptr SelectEventImpl
when hasThreadSupport:
template withSelectLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
{.locks: [s.lock].}:
try:
body
finally:
release(s.lock)
else:
template withSelectLock[T](s: Selector[T], body: untyped) =
body
proc newSelector*[T](): Selector[T] =
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE)
initLock result.lock
else:
result = Selector[T]()
result.fds = newSeq[SelectorKey[T]](FD_SETSIZE)
IOFD_ZERO(addr result.rSet)
IOFD_ZERO(addr result.wSet)
IOFD_ZERO(addr result.eSet)
proc close*[T](s: Selector[T]) =
when hasThreadSupport:
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
when defined(windows):
proc newSelectEvent*(): SelectEvent =
var ssock = newNativeSocket()
var wsock = newNativeSocket()
var rsock: SocketHandle = INVALID_SOCKET
var saddr = Sockaddr_in()
saddr.sin_family = winlean.AF_INET
saddr.sin_port = 0
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) < 0'i32:
raiseOSError(osLastError())
if winlean.listen(ssock, 1) == -1:
raiseOSError(osLastError())
var namelen = sizeof(saddr).SockLen
if getsockname(ssock, cast[ptr SockAddr](addr(saddr)),
addr(namelen)) == -1'i32:
raiseOSError(osLastError())
saddr.sin_addr.s_addr = 0x0100007F
if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) == -1:
raiseOSError(osLastError())
namelen = sizeof(saddr).SockLen
rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)),
cast[ptr SockLen](addr(namelen)))
if rsock == SocketHandle(-1):
raiseOSError(osLastError())
if winlean.closesocket(ssock) == -1:
raiseOSError(osLastError())
var mode = clong(1)
if ioctlsocket(rsock, FIONBIO, addr(mode)) == -1:
raiseOSError(osLastError())
mode = clong(1)
if ioctlsocket(wsock, FIONBIO, addr(mode)) == -1:
raiseOSError(osLastError())
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rsock = rsock
result.wsock = wsock
proc setEvent*(ev: SelectEvent) =
var data: int = 1
if winlean.send(ev.wsock, cast[pointer](addr data),
cint(sizeof(int)), 0) != sizeof(int):
raiseOSError(osLastError())
proc close*(ev: SelectEvent) =
discard winlean.closesocket(ev.rsock)
discard winlean.closesocket(ev.wsock)
deallocShared(cast[pointer](ev))
else:
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rsock = SocketHandle(fds[0])
result.wsock = SocketHandle(fds[1])
proc setEvent*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(cint(ev.rsock))
discard posix.close(cint(ev.wsock))
deallocShared(cast[pointer](ev))
proc setKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) =
var i = 0
let fdi = int(fd)
while i < FD_SETSIZE:
if s.fds[i].ident == 0:
var pkey = addr(s.fds[i])
pkey.ident = fdi
pkey.events = events
pkey.key.fd = fd.int
pkey.key.events = {}
pkey.key.data = data
break
inc(i)
if i == FD_SETSIZE:
raise newException(ValueError, "Maximum numbers of fds exceeded")
proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
var i = 0
let fdi = int(fd)
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
result = addr(s.fds[i])
break
inc(i)
doAssert(i < FD_SETSIZE, "Descriptor not registered in queue")
proc delKey[T](s: Selector[T], fd: SocketHandle) =
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fd.int:
s.fds[i].ident = 0
s.fds[i].events = {}
break
inc(i)
doAssert(i < FD_SETSIZE, "Descriptor not registered in queue")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
when not defined(windows):
let fdi = int(fd)
s.withSelectLock():
s.setKey(fd, events, data)
when not defined(windows):
if fdi > s.maxFD: s.maxFD = fdi
if Event.Read in events:
IOFD_SET(fd, addr s.rSet)
inc(s.count)
if Event.Write in events:
IOFD_SET(fd, addr s.wSet)
IOFD_SET(fd, addr s.eSet)
inc(s.count)
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
when not defined(windows):
let fdi = int(ev.rsock)
s.withSelectLock():
s.setKey(ev.rsock, {Event.User}, data)
when not defined(windows):
if fdi > s.maxFD: s.maxFD = fdi
IOFD_SET(ev.rsock, addr s.rSet)
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
s.withSelectLock():
var pkey = s.getKey(fd)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
if (Event.Read in pkey.events) and (Event.Read notin events):
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
if (Event.Write in pkey.events) and (Event.Write notin events):
IOFD_CLR(fd, addr s.wSet)
IOFD_CLR(fd, addr s.eSet)
dec(s.count)
if (Event.Read notin pkey.events) and (Event.Read in events):
IOFD_SET(fd, addr s.rSet)
inc(s.count)
if (Event.Write notin pkey.events) and (Event.Write in events):
IOFD_SET(fd, addr s.wSet)
IOFD_SET(fd, addr s.eSet)
inc(s.count)
pkey.events = events
proc unregister*[T](s: Selector[T], fd: SocketHandle) =
s.withSelectLock():
var pkey = s.getKey(fd)
if Event.Read in pkey.events:
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
if Event.Write in pkey.events:
IOFD_CLR(fd, addr s.wSet)
IOFD_CLR(fd, addr s.eSet)
dec(s.count)
s.delKey(fd)
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fd = ev.rsock
s.withSelectLock():
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
s.delKey(fd)
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
var tv = Timeval()
var ptv = addr tv
var rset, wset, eset: FdSet
if timeout != -1:
tv.tv_sec = timeout.int32 div 1_000
tv.tv_usec = (timeout.int32 %% 1_000) * 1_000
else:
ptv = nil
s.withSelectLock():
rset = s.rSet
wset = s.wSet
eset = s.eSet
var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset),
addr(eset), ptv)
if count < 0:
result = 0
when defined(windows):
raiseOSError(osLastError())
else:
let err = osLastError()
if cint(err) != EINTR:
raiseOSError(err)
elif count == 0:
result = 0
else:
var rindex = 0
var i = 0
var k = 0
while (i < FD_SETSIZE) and (k < count):
if s.fds[i].ident != 0:
var flag = false
var pkey = addr(s.fds[i])
pkey.key.events = {}
let fd = SocketHandle(pkey.ident)
if IOFD_ISSET(fd, addr rset) != 0:
if Event.User in pkey.events:
var data: uint64 = 0
if recv(fd, cast[pointer](addr(data)),
sizeof(uint64).cint, 0) != sizeof(uint64):
let err = osLastError()
if cint(err) != EAGAIN:
raiseOSError(err)
else:
inc(i)
inc(k)
continue
else:
flag = true
pkey.key.events = {Event.User}
else:
flag = true
pkey.key.events = {Event.Read}
if IOFD_ISSET(fd, addr wset) != 0:
pkey.key.events.incl(Event.Write)
if IOFD_ISSET(fd, addr eset) != 0:
pkey.key.events.incl(Event.Error)
flag = true
if flag:
results[rindex] = pkey.key
inc(rindex)
inc(k)
inc(i)
result = rindex
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](FD_SETSIZE)
var count = selectInto(s, timeout, result)
result.setLen(count)
proc flush*[T](s: Selector[T]) = discard
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
when hasThreadSupport:
template withSelectLock[T](s: Selector[T], body: untyped) =
acquire(s.lock)
{.locks: [s.lock].}:
try:
body
finally:
release(s.lock)
else:
template withSelectLock[T](s: Selector[T], body: untyped) =
body
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body: untyped) =
mixin withSelectLock
s.withSelectLock():
var value: ptr T
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
value = addr(s.fds[i].key.data)
break
inc(i)
if i != FD_SETSIZE:
body
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body1, body2: untyped) =
mixin withSelectLock
s.withSelectLock():
var value: ptr T
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
value = addr(s.fds[i].key.data)
break
inc(i)
if i != FD_SETSIZE:
body1
else:
body2

File diff suppressed because it is too large Load Diff

View File

@@ -124,7 +124,7 @@ when not defined(windows):
proc event_notification_test(): bool =
var selector = newSelector[int]()
var event = newEvent()
var event = newSelectEvent()
selector.registerEvent(event, 1)
selector.flush()
event.setEvent()
@@ -235,7 +235,7 @@ when not defined(windows):
thr: array [0..7, Thread[SelectEvent]]
var selector = newSelector[int]()
var sock = newNativeSocket()
var event = newEvent()
var event = newSelectEvent()
for i in 0..high(thr):
createThread(thr[i], event_wait_thread, event)
selector.registerHandle(sock, {Event.Read}, 1)
@@ -358,7 +358,7 @@ else:
proc event_notification_test(): bool =
var selector = newSelector[int]()
var event = newEvent()
var event = newSelectEvent()
selector.registerEvent(event, 1)
selector.flush()
event.setEvent()
@@ -391,7 +391,7 @@ else:
proc mt_event_test(): bool =
var thr: array [0..7, Thread[SelectEvent]]
var event = newEvent()
var event = newSelectEvent()
for i in 0..high(thr):
createThread(thr[i], event_wait_thread, event)
event.setEvent()