* Fix #5128, #5184.
Removed flush() procedure from ioselectors.nim
Changed methods of work with application-driven data

* Make cache switch for kqueue, update test for it.

* Fix registerProcess bug returns wrong id.
Fix tupcoming_async test to compile with upcoming again.
Change socket() as unique identifier to dup(socket) as unique identifier.
This commit is contained in:
Eugene Kabanov
2017-01-16 15:01:40 +02:00
committed by Andreas Rumpf
parent f8736dcfb7
commit 108f5e688e
8 changed files with 674 additions and 479 deletions

View File

@@ -40,7 +40,6 @@ const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or
const bsdPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd)
when defined(nimdoc):
type
Selector*[T] = ref object
@@ -64,11 +63,10 @@ when defined(nimdoc):
VnodeRename, ## NOTE_RENAME (BSD specific, file renamed)
VnodeRevoke ## NOTE_REVOKE (BSD specific, file revoke occurred)
ReadyKey*[T] = object
ReadyKey* = object
## An object which holds result for descriptor
fd* : int ## file/socket descriptor
events*: set[Event] ## set of events
data*: T ## application-defined data
SelectEvent* = object
## An object which holds user defined event
@@ -142,15 +140,8 @@ when defined(nimdoc):
proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) =
## Unregisters file/socket descriptor ``fd`` from selector ``s``.
proc flush*[T](s: Selector[T]) =
## Flushes all changes was made to kernel pool/queue.
## This function is useful only for BSD and MacOS, because
## kqueue supports bulk changes to be made.
## On Linux/Windows and other Posix compatible operation systems,
## ``flush`` is alias for `discard`.
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
results: var openarray[ReadyKey]): int =
## Process call waiting for events registered in selector ``s``.
## The ``timeout`` argument specifies the minimum number of milliseconds
## the function will be blocked, if no events are not ready. Specifying a
@@ -159,7 +150,7 @@ when defined(nimdoc):
##
## Function returns number of triggered events.
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
## Process call waiting for events registered in selector ``s``.
## The ``timeout`` argument specifies the minimum number of milliseconds
## the function will be blocked, if no events are not ready. Specifying a
@@ -167,13 +158,23 @@ when defined(nimdoc):
##
## Function returns sequence of triggered events.
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
## Retrieves application-defined ``data`` associated with descriptor ``fd``.
## If specified descriptor ``fd`` is not registered, empty/default value
## will be returned.
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: var T): bool =
## Associate application-defined ``data`` with descriptor ``fd``.
##
## Returns ``true``, if data was succesfully updated, ``false`` otherwise.
template isEmpty*[T](s: Selector[T]): bool =
## Returns ``true``, if there no registered events or descriptors
## in selector.
template withData*[T](s: Selector[T], fd: SocketHandle, value,
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
## retrieves the application-data assigned with descriptor ``fd``
## Retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
@@ -184,9 +185,9 @@ when defined(nimdoc):
## value.uid = 1000
##
template withData*[T](s: Selector[T], fd: SocketHandle, value,
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body1, body2: untyped) =
## retrieves the application-data assigned with descriptor ``fd``
## Retrieves the application-data assigned with descriptor ``fd``
## to ``value``. This ``value`` can be modified in the scope of
## the ``withData`` call.
##
@@ -215,55 +216,68 @@ else:
type
Event* {.pure.} = enum
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot,
VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink,
Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink,
VnodeRename, VnodeRevoke
ReadyKey*[T] = object
type
IOSelectorsException* = object of Exception
ReadyKey* = object
fd* : int
events*: set[Event]
data*: T
SelectorKey[T] = object
ident: int
events: set[Event]
param: int
key: ReadyKey[T]
data: T
proc raiseIOSelectorsError[T](message: T) =
var msg = ""
when T is string:
msg.add(message)
elif T is OSErrorCode:
msg.add(osErrorMsg(message) & " (code: " & $int(message) & ")")
else:
msg.add("Internal Error\n")
var err = newException(IOSelectorsException, msg)
raise err
when not defined(windows):
import posix
proc setNonBlocking(fd: cint) {.inline.} =
var x = fcntl(fd, F_GETFL, 0)
if x == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
else:
var mode = x or O_NONBLOCK
if fcntl(fd, F_SETFL, mode) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
template setKey(s, pident, pkeyfd, pevents, pparam, pdata) =
template setKey(s, pident, pevents, pparam, pdata: untyped) =
var skey = addr(s.fds[pident])
skey.ident = pident
skey.events = pevents
skey.param = pparam
skey.key.fd = pkeyfd
skey.key.data = pdata
skey.data = data
when ioselSupportedPlatform:
template blockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
else:
if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
template unblockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
else:
if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
when defined(linux):
include ioselects/ioselectors_epoll

View File

@@ -12,7 +12,7 @@
import posix, times
# Maximum number of events that can be returned
const MAX_EPOLL_RESULT_EVENTS = 64
const MAX_EPOLL_EVENTS = 64
when not defined(android):
type
@@ -115,7 +115,7 @@ proc newSelector*[T](): Selector[T] =
var maxFD = int(a.rlim_max)
doAssert(maxFD > 0)
var epollFD = epoll_create(MAX_EPOLL_RESULT_EVENTS)
var epollFD = epoll_create(MAX_EPOLL_EVENTS)
if epollFD < 0:
raiseOsError(osLastError())
@@ -132,15 +132,21 @@ proc newSelector*[T](): Selector[T] =
proc close*[T](s: Selector[T]) =
if posix.close(s.epollFD) != 0:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
when hasThreadSupport:
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = 0
key.events = {}
key.data = empty
proc newSelectEvent*(): SelectEvent =
let fdci = eventfd(0, 0)
if fdci == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fdci)
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.efd = fdci
@@ -148,29 +154,30 @@ proc newSelectEvent*(): SelectEvent =
proc setEvent*(ev: SelectEvent) =
var data : uint64 = 1
if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(ev.efd)
if posix.close(ev.efd) == -1:
raiseIOSelectorsError(osLastError())
deallocShared(cast[pointer](ev))
template checkFd(s, f) =
if f >= s.maxFD:
raise newException(ValueError, "Maximum file descriptors exceeded")
raiseIOSelectorsError("Maximum file descriptors exceeded")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, events, 0, data)
s.setKey(fdi, events, 0, data)
if events != {}:
var epv = epoll_event(events: EPOLLRDHUP)
epv.data.u64 = fdi.uint
if Event.Read in events: epv.events = epv.events or EPOLLIN
if Event.Write in events: epv.events = epv.events or EPOLLOUT
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
@@ -190,15 +197,15 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
if pkey.events == {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
inc(s.count)
else:
if events != {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
else:
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
dec(s.count)
pkey.events = events
@@ -213,51 +220,56 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
dec(s.count)
if Event.Finished notin pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseIOSelectorsError(osLastError())
dec(s.count)
if posix.close(cint(fdi)) == -1:
raiseIOSelectorsError(osLastError())
elif Event.Signal in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(s.fds[fdi].param))
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
if posix.close(cint(fdi)) == -1:
raiseIOSelectorsError(osLastError())
elif Event.Process in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, SIGCHLD)
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
if Event.Finished notin pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseIOSelectorsError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, SIGCHLD)
unblockSignals(nmask, omask)
dec(s.count)
if posix.close(cint(fdi)) == -1:
raiseIOSelectorsError(osLastError())
else:
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
dec(s.count)
pkey.ident = 0
pkey.events = {}
if Event.Finished notin pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseIOSelectorsError(osLastError())
dec(s.count)
if posix.close(cint(fdi)) == -1:
raiseIOSelectorsError(osLastError())
clearKey(pkey)
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.efd)
@@ -265,12 +277,11 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.events = {}
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
dec(s.count)
clearKey(pkey)
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
@@ -279,7 +290,7 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
old_ts: Itimerspec
let fdi = timerfd_create(CLOCK_MONOTONIC, 0).int
if fdi == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
@@ -302,10 +313,10 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec
if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, fdi, events, 0, data)
raiseIOSelectorsError(osLastError())
s.setKey(fdi, events, 0, data)
inc(s.count)
result = fdi
@@ -323,7 +334,7 @@ when not defined(android):
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
@@ -332,8 +343,8 @@ when not defined(android):
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, signal, {Event.Signal}, signal, data)
raiseIOSelectorsError(osLastError())
s.setKey(fdi, {Event.Signal}, signal, data)
inc(s.count)
result = fdi
@@ -350,7 +361,7 @@ when not defined(android):
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
@@ -360,30 +371,26 @@ when not defined(android):
epv.data.u64 = fdi.uint
epv.events = EPOLLIN or EPOLLRDHUP
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, pid, {Event.Process, Event.Oneshot}, pid, data)
raiseIOSelectorsError(osLastError())
s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data)
inc(s.count)
result = fdi
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = int(ev.efd)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, {Event.User}, 0, data)
s.setKey(fdi, {Event.User}, 0, data)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = ev.efd.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
inc(s.count)
proc flush*[T](s: Selector[T]) =
discard
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
results: var openarray[ReadyKey]): int =
var
resTable: array[MAX_EPOLL_RESULT_EVENTS, epoll_event]
maxres = MAX_EPOLL_RESULT_EVENTS
events: set[Event] = {}
resTable: array[MAX_EPOLL_EVENTS, epoll_event]
maxres = MAX_EPOLL_EVENTS
i, k: int
if maxres > len(results):
@@ -395,7 +402,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
result = 0
let err = osLastError()
if cint(err) != EINTR:
raiseOSError(err)
raiseIOSelectorsError(err)
elif count == 0:
result = 0
else:
@@ -404,108 +411,126 @@ proc selectInto*[T](s: Selector[T], timeout: int,
while i < count:
let fdi = int(resTable[i].data.u64)
let pevents = resTable[i].events
var skey = addr(s.fds[fdi])
doAssert(skey.ident != 0)
events = {}
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
var rkey = ReadyKey(fd: int(fdi), events: {})
if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
events.incl(Event.Error)
rkey.events.incl(Event.Error)
if (pevents and EPOLLOUT) != 0:
events.incl(Event.Write)
rkey.events.incl(Event.Write)
when not defined(android):
if (pevents and EPOLLIN) != 0:
if Event.Read in skey.events:
events.incl(Event.Read)
elif Event.Timer in skey.events:
if Event.Read in pkey.events:
rkey.events.incl(Event.Read)
elif Event.Timer in pkey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
events = {Event.Timer}
elif Event.Signal in skey.events:
if posix.read(cint(fdi), addr data,
sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
rkey.events.incl(Event.Timer)
elif Event.Signal in pkey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
if posix.read(cint(fdi), addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
events = {Event.Signal}
elif Event.Process in skey.events:
raiseIOSelectorsError(osLastError())
rkey.events.incl(Event.Signal)
elif Event.Process in pkey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
if posix.read(cint(fdi), addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
if cast[int](data.ssi_pid) == skey.param:
events = {Event.Process}
raiseIOSelectorsError(osLastError())
if cast[int](data.ssi_pid) == pkey.param:
rkey.events.incl(Event.Process)
else:
inc(i)
continue
elif Event.User in skey.events:
elif Event.User in pkey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
if posix.read(cint(fdi), addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
inc(i)
continue
else:
raiseOSError(err)
events = {Event.User}
raiseIOSelectorsError(err)
rkey.events.incl(Event.User)
else:
if (pevents and EPOLLIN) != 0:
if Event.Read in skey.events:
events.incl(Event.Read)
elif Event.Timer in skey.events:
if Event.Read in pkey.events:
rkey.events.incl(Event.Read)
elif Event.Timer in pkey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
events = {Event.Timer}
elif Event.User in skey.events:
if posix.read(cint(fdi), addr data,
sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())
rkey.events.incl(Event.Timer)
elif Event.User in pkey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
if posix.read(cint(fdi), addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
inc(i)
continue
else:
raiseOSError(err)
events = {Event.User}
raiseIOSelectorsError(err)
rkey.events.incl(Event.User)
skey.key.events = events
results[k] = skey.key
inc(k)
if Event.Oneshot in skey.events:
if Event.Oneshot in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
skey.ident = 0
skey.events = {}
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) == -1:
raiseIOSelectorsError(osLastError())
# we will not clear key until it will be unregistered, so
# application can obtain data, but we will decrease counter,
# because epoll is empty.
dec(s.count)
# we are marking key with `Finished` event, to avoid double decrease.
pkey.events.incl(Event.Finished)
results[k] = rkey
inc(k)
inc(i)
result = k
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](MAX_EPOLL_RESULT_EVENTS)
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
result = newSeq[ReadyKey](MAX_EPOLL_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
template withData*[T](s: Selector[T], fd: SocketHandle, value,
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
s.fds[fdi].data = data
result = true
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
var value = addr(s.fds[fdi].data)
body
template withData*[T](s: Selector[T], fd: SocketHandle, value, body1,
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
var value = addr(s.fds[fdi].data)
body1
else:
body2

View File

@@ -12,15 +12,16 @@
import posix, times, kqueue
const
# Maximum number of cached changes.
MAX_KQUEUE_CHANGE_EVENTS = 64
# Maximum number of events that can be returned.
MAX_KQUEUE_RESULT_EVENTS = 64
MAX_KQUEUE_EVENTS = 64
# SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them
# to be constants and GC-safe.
SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0)
SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1)
when defined(kqcache):
const CACHE_EVENTS = true
when defined(macosx) or defined(freebsd):
when defined(macosx):
const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS)
@@ -45,68 +46,88 @@ when hasThreadSupport:
SelectorImpl[T] = object
kqFD : cint
maxFD : int
changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent]
changesCount: int
changes: seq[KEvent]
fds: ptr SharedArray[SelectorKey[T]]
count: int
changesLock: Lock
sock: cint
Selector*[T] = ptr SelectorImpl[T]
else:
type
SelectorImpl[T] = object
kqFD : cint
maxFD : int
changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent]
changesCount: int
changes: seq[KEvent]
fds: seq[SelectorKey[T]]
count: int
sock: cint
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
rfd: cint
wfd: cint
# SelectEvent is declared as `ptr` to be placed in `shared memory`,
# so you can share one SelectEvent handle between threads.
type SelectEvent* = ptr SelectEventImpl
SelectEvent* = ptr SelectEventImpl
# SelectEvent is declared as `ptr` to be placed in `shared memory`,
# so you can share one SelectEvent handle between threads.
proc getUnique[T](s: Selector[T]): int {.inline.} =
# we create duplicated handles to get unique indexes for our `fds` array.
result = posix.fcntl(s.sock, F_DUPFD, s.sock)
if result == -1:
raiseIOSelectorsError(osLastError())
proc newSelector*[T](): Selector[T] =
var maxFD = 0.cint
var size = csize(sizeof(cint))
var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint]
# Obtain maximum number of file descriptors for process
# Obtain maximum number of opened file descriptors for process
if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size,
nil, 0) != 0:
raiseOsError(osLastError())
raiseIOSelectorsError(osLastError())
var kqFD = kqueue()
if kqFD < 0:
raiseOsError(osLastError())
raiseIOSelectorsError(osLastError())
when hasThreadSupport:
result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
result.kqFD = kqFD
result.maxFD = maxFD.int
result.fds = allocSharedArray[SelectorKey[T]](maxFD)
initLock(result.changesLock)
else:
result = Selector[T]()
result.kqFD = kqFD
result.maxFD = maxFD.int
result.fds = newSeq[SelectorKey[T]](maxFD)
result.kqFD = kqFD
result.maxFD = maxFD.int
result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS)
# we allocating empty socket to duplicate it handle in future, to get unique
# indexes for `fds` array. This is needed to properly identify
# {Event.Timer, Event.Signal, Event.Process} events.
result.sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).cint
if result.sock == -1:
raiseIOSelectorsError(osLastError())
proc close*[T](s: Selector[T]) =
if posix.close(s.kqFD) != 0:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
when hasThreadSupport:
deinitLock(s.changesLock)
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = 0
key.events = {}
key.data = empty
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
@@ -116,16 +137,18 @@ proc newSelectEvent*(): SelectEvent =
proc setEvent*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(cint(ev.rfd))
discard posix.close(cint(ev.wfd))
if posix.close(cint(ev.rfd)) == -1:
raiseIOSelectorsError(osLastError())
if posix.close(cint(ev.wfd)) == -1:
raiseIOSelectorsError(osLastError())
deallocShared(cast[pointer](ev))
template checkFd(s, f) =
if f >= s.maxFD:
raise newException(ValueError, "Maximum file descriptors exceeded")
raiseIOSelectorsError("Maximum file descriptors exceeded!")
when hasThreadSupport:
template withChangeLock[T](s: Selector[T], body: untyped) =
@@ -144,31 +167,40 @@ template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
nudata: pointer) =
mixin withChangeLock
s.withChangeLock():
s.changesTable[s.changesCount] = KEvent(ident: nident,
filter: nfilter, flags: nflags,
fflags: nfflags, data: ndata,
udata: nudata)
inc(s.changesCount)
if s.changesCount == MAX_KQUEUE_CHANGE_EVENTS:
if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount),
nil, 0, nil) == -1:
raiseOSError(osLastError())
s.changesCount = 0
s.changes.add(KEvent(ident: nident,
filter: nfilter, flags: nflags,
fflags: nfflags, data: ndata,
udata: nudata))
when not declared(CACHE_EVENTS):
template flushKQueue[T](s: Selector[T]) =
mixin withChangeLock
s.withChangeLock():
let length = cint(len(s.changes))
if length > 0:
if kevent(s.kqFD, addr(s.changes[0]), length,
nil, 0, nil) == -1:
raiseIOSelectorsError(osLastError())
s.changes.setLen(0)
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, events, 0, data)
s.setKey(fdi, events, 0, data)
if events != {}:
if Event.Read in events:
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil)
inc(s.count)
if Event.Write in events:
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil)
inc(s.count)
when not declared(CACHE_EVENTS):
flushKQueue(s)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
@@ -192,40 +224,41 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
if (Event.Write notin pkey.events) and (Event.Write in events):
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil)
inc(s.count)
when not declared(CACHE_EVENTS):
flushKQueue(s)
pkey.events = events
proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).int
if fdi == -1:
raiseOsError(osLastError())
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer}
let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD
s.setKey(fdi, fdi, events, 0, data)
s.setKey(fdi, events, 0, data)
# EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds,
# but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds
# too
modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
inc(s.count)
result = fdi
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).int
if fdi == -1:
raiseOsError(osLastError())
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, signal, {Event.Signal}, signal, data)
s.setKey(fdi, {Event.Signal}, signal, data)
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
@@ -233,33 +266,44 @@ proc registerSignal*[T](s: Selector[T], signal: int,
blockSignals(nmask, omask)
# to be compatible with linux semantic we need to "eat" signals
posix.signal(cint(signal), SIG_IGN)
modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0,
cast[pointer](fdi))
when not declared(CACHE_EVENTS):
flushKQueue(s)
inc(s.count)
result = fdi
proc registerProcess*[T](s: Selector[T], pid: int,
data: T): int {.discardable.} =
var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
posix.IPPROTO_TCP).int
if fdi == -1:
raiseOsError(osLastError())
data: T): int {.discardable.} =
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var kflags: cushort = EV_ONESHOT or EV_ADD
setKey(s, fdi, pid, {Event.Process, Event.Oneshot}, pid, data)
setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data)
modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0,
cast[pointer](fdi))
when not declared(CACHE_EVENTS):
flushKQueue(s)
inc(s.count)
result = fdi
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = ev.rfd.int
doAssert(s.fds[fdi].ident == 0)
setKey(s, fdi, fdi, {Event.User}, 0, data)
setKey(s, fdi, {Event.User}, 0, data)
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
inc(s.count)
template processVnodeEvents(events: set[Event]): cuint =
@@ -281,9 +325,14 @@ template processVnodeEvents(events: set[Event]): cuint =
proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) =
let fdi = fd.int
setKey(s, fdi, fdi, {Event.Vnode} + events, 0, data)
setKey(s, fdi, {Event.Vnode} + events, 0, data)
var fflags = processVnodeEvents(events)
modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
inc(s.count)
proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
@@ -295,38 +344,55 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
if pkey.events != {}:
if pkey.events * {Event.Read, Event.Write} != {}:
if Event.Read in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
dec(s.count)
if Event.Write in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil)
dec(s.count)
when not declared(CACHE_EVENTS):
flushKQueue(s)
elif Event.Timer in pkey.events:
discard posix.close(cint(pkey.key.fd))
modifyKQueue(s, fdi.uint, EVFILT_TIMER, EV_DELETE, 0, 0, nil)
dec(s.count)
if Event.Finished notin pkey.events:
modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
dec(s.count)
if posix.close(cint(pkey.ident)) == -1:
raiseIOSelectorsError(osLastError())
elif Event.Signal in pkey.events:
var nmask, omask: Sigset
var signal = cint(pkey.param)
let signal = cint(pkey.param)
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, signal)
unblockSignals(nmask, omask)
posix.signal(signal, SIG_DFL)
discard posix.close(cint(pkey.key.fd))
modifyKQueue(s, fdi.uint, EVFILT_SIGNAL, EV_DELETE, 0, 0, nil)
modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
dec(s.count)
if posix.close(cint(pkey.ident)) == -1:
raiseIOSelectorsError(osLastError())
elif Event.Process in pkey.events:
discard posix.close(cint(pkey.key.fd))
modifyKQueue(s, fdi.uint, EVFILT_PROC, EV_DELETE, 0, 0, nil)
dec(s.count)
if Event.Finished notin pkey.events:
modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
dec(s.count)
if posix.close(cint(pkey.ident)) == -1:
raiseIOSelectorsError(osLastError())
elif Event.Vnode in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_DELETE, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
dec(s.count)
elif Event.User in pkey.events:
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
dec(s.count)
pkey.ident = 0
pkey.events = {}
clearKey(pkey)
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
@@ -334,26 +400,20 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.events = {}
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil)
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
clearKey(pkey)
dec(s.count)
proc flush*[T](s: Selector[T]) =
s.withChangeLock():
var tv = Timespec()
if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount),
nil, 0, addr tv) == -1:
raiseOSError(osLastError())
s.changesCount = 0
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
results: var openarray[ReadyKey]): int =
var
tv: Timespec
resTable: array[MAX_KQUEUE_RESULT_EVENTS, KEvent]
resTable: array[MAX_KQUEUE_EVENTS, KEvent]
ptv = addr tv
maxres = MAX_KQUEUE_RESULT_EVENTS
maxres = MAX_KQUEUE_EVENTS
if timeout != -1:
if timeout >= 1000:
@@ -369,116 +429,147 @@ proc selectInto*[T](s: Selector[T], timeout: int,
maxres = len(results)
var count = 0
s.withChangeLock():
count = kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount),
addr(resTable[0]), cint(maxres), ptv)
s.changesCount = 0
when not declared(CACHE_EVENTS):
count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv)
else:
s.withChangeLock():
let length = cint(len(s.changes))
if length > 0:
count = kevent(s.kqFD, addr(s.changes[0]), length,
addr(resTable[0]), cint(maxres), ptv)
s.changes.setLen(0)
else:
count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
ptv)
if count < 0:
result = 0
let err = osLastError()
if cint(err) != EINTR:
raiseOSError(err)
raiseIOSelectorsError(err)
elif count == 0:
result = 0
else:
var i = 0
var k = 0
var k = 0 # do not delete this, because `continue` used in cycle.
var pkey: ptr SelectorKey[T]
while i < count:
let kevent = addr(resTable[i])
if (kevent.flags and EV_ERROR) == 0:
case kevent.filter:
of EVFILT_READ:
pkey = addr(s.fds[kevent.ident.int])
pkey.key.events = {Event.Read}
if Event.User in pkey.events:
var data: uint64 = 0
if posix.read(kevent.ident.cint, addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
# someone already consumed event data
inc(i)
continue
else:
raiseOSError(osLastError())
pkey.key.events = {Event.User}
of EVFILT_WRITE:
pkey = addr(s.fds[kevent.ident.int])
pkey.key.events = {Event.Write}
of EVFILT_TIMER:
pkey = addr(s.fds[kevent.ident.int])
if Event.Oneshot in pkey.events:
if posix.close(cint(pkey.ident)) == -1:
raiseOSError(osLastError())
pkey.ident = 0
pkey.events = {}
dec(s.count)
pkey.key.events = {Event.Timer}
of EVFILT_VNODE:
pkey = addr(s.fds[kevent.ident.int])
pkey.key.events = {Event.Vnode}
if (kevent.fflags and NOTE_DELETE) != 0:
pkey.key.events.incl(Event.VnodeDelete)
if (kevent.fflags and NOTE_WRITE) != 0:
pkey.key.events.incl(Event.VnodeWrite)
if (kevent.fflags and NOTE_EXTEND) != 0:
pkey.key.events.incl(Event.VnodeExtend)
if (kevent.fflags and NOTE_ATTRIB) != 0:
pkey.key.events.incl(Event.VnodeAttrib)
if (kevent.fflags and NOTE_LINK) != 0:
pkey.key.events.incl(Event.VnodeLink)
if (kevent.fflags and NOTE_RENAME) != 0:
pkey.key.events.incl(Event.VnodeRename)
if (kevent.fflags and NOTE_REVOKE) != 0:
pkey.key.events.incl(Event.VnodeRevoke)
of EVFILT_SIGNAL:
pkey = addr(s.fds[cast[int](kevent.udata)])
pkey.key.events = {Event.Signal}
of EVFILT_PROC:
pkey = addr(s.fds[cast[int](kevent.udata)])
if posix.close(cint(pkey.ident)) == -1:
raiseOSError(osLastError())
pkey.ident = 0
pkey.events = {}
var rkey = ReadyKey(fd: int(kevent.ident), events: {})
if (kevent.flags and EV_ERROR) != 0:
rkey.events = {Event.Error}
case kevent.filter:
of EVFILT_READ:
pkey = addr(s.fds[int(kevent.ident)])
rkey.events.incl(Event.Read)
if Event.User in pkey.events:
var data: uint64 = 0
if posix.read(cint(kevent.ident), addr data,
sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
# someone already consumed event data
inc(i)
continue
else:
raiseIOSelectorsError(err)
rkey.events = {Event.User}
of EVFILT_WRITE:
pkey = addr(s.fds[int(kevent.ident)])
rkey.events.incl(Event.Write)
rkey.events = {Event.Write}
of EVFILT_TIMER:
pkey = addr(s.fds[int(kevent.ident)])
if Event.Oneshot in pkey.events:
# we will not clear key until it will be unregistered, so
# application can obtain data, but we will decrease counter,
# because kqueue is empty.
dec(s.count)
pkey.key.events = {Event.Process}
else:
raise newException(ValueError, "Unsupported kqueue filter in queue")
# we are marking key with `Finished` event, to avoid double decrease.
pkey.events.incl(Event.Finished)
rkey.events.incl(Event.Timer)
of EVFILT_VNODE:
pkey = addr(s.fds[int(kevent.ident)])
rkey.events.incl(Event.Vnode)
if (kevent.fflags and NOTE_DELETE) != 0:
rkey.events.incl(Event.VnodeDelete)
if (kevent.fflags and NOTE_WRITE) != 0:
rkey.events.incl(Event.VnodeWrite)
if (kevent.fflags and NOTE_EXTEND) != 0:
rkey.events.incl(Event.VnodeExtend)
if (kevent.fflags and NOTE_ATTRIB) != 0:
rkey.events.incl(Event.VnodeAttrib)
if (kevent.fflags and NOTE_LINK) != 0:
rkey.events.incl(Event.VnodeLink)
if (kevent.fflags and NOTE_RENAME) != 0:
rkey.events.incl(Event.VnodeRename)
if (kevent.fflags and NOTE_REVOKE) != 0:
rkey.events.incl(Event.VnodeRevoke)
of EVFILT_SIGNAL:
pkey = addr(s.fds[cast[int](kevent.udata)])
rkey.fd = cast[int](kevent.udata)
rkey.events.incl(Event.Signal)
of EVFILT_PROC:
rkey.fd = cast[int](kevent.udata)
pkey = addr(s.fds[cast[int](kevent.udata)])
# we will not clear key, until it will be unregistered, so
# application can obtain data, but we will decrease counter,
# because kqueue is empty.
dec(s.count)
# we are marking key with `Finished` event, to avoid double decrease.
pkey.events.incl(Event.Finished)
rkey.events.incl(Event.Process)
else:
pkey = addr(s.fds[cast[int](kevent.udata)])
raiseIOSelectorsError("Unsupported kqueue filter in queue!")
if (kevent.flags and EV_EOF) != 0:
pkey.key.events.incl(Event.Error)
if (kevent.flags and EV_EOF) != 0:
rkey.events.incl(Event.Error)
results[k] = pkey.key
inc(k)
results[k] = rkey
inc(k)
inc(i)
result = k
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](MAX_KQUEUE_RESULT_EVENTS)
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
template withData*[T](s: Selector[T], fd: SocketHandle, value,
body: untyped) =
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
s.fds[fdi].data = data
result = true
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
var value = addr(s.fds[fdi].data)
body
template withData*[T](s: Selector[T], fd: SocketHandle, value, body1,
body2: untyped) =
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
var value = addr(s.fds[fdi].data)
body1
else:
body2

View File

@@ -12,7 +12,7 @@
import posix, times
# Maximum number of events that can be returned
const MAX_POLL_RESULT_EVENTS = 64
const MAX_POLL_EVENTS = 64
when hasThreadSupport:
type
@@ -65,7 +65,7 @@ else:
proc newSelector*[T](): Selector[T] =
var a = rlimit()
if getrlimit(RLIMIT_NOFILE, a) != 0:
raiseOsError(osLastError())
raiseIOSelectorsError(osLastError())
var maxFD = int(a.rlim_max)
when hasThreadSupport:
@@ -87,6 +87,12 @@ proc close*[T](s: Selector[T]) =
deallocSharedArray(s.pollfds)
deallocShared(cast[pointer](s))
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = 0
key.events = {}
key.data = empty
template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) =
withPollLock(s):
var pollev: cshort = 0
@@ -111,7 +117,7 @@ template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
inc(i)
if i == s.pollcnt:
raise newException(ValueError, "Descriptor is not registered in queue")
raiseIOSelectorsError("Descriptor is not registered in queue")
template pollRemove[T](s: Selector[T], sock: cint) =
withPollLock(s):
@@ -134,14 +140,14 @@ template pollRemove[T](s: Selector[T], sock: cint) =
template checkFd(s, f) =
if f >= s.maxFD:
raise newException(ValueError, "Maximum file descriptors exceeded")
raiseIOSelectorsError("Descriptor is not registered in queue")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
var fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.setKey(fdi, fdi, events, 0, data)
setKey(s, fdi, events, 0, data)
if events != {}: s.pollAdd(fdi.cint, events)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
@@ -168,12 +174,10 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
var fdi = int(ev.rfd)
doAssert(s.fds[fdi].ident == 0)
var events = {Event.User}
setKey(s, fdi, fdi, events, 0, data)
setKey(s, fdi, events, 0, data)
events.incl(Event.Read)
s.pollAdd(fdi.cint, events)
proc flush*[T](s: Selector[T]) = discard
proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
@@ -196,7 +200,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
@@ -206,16 +210,18 @@ proc newSelectEvent*(): SelectEvent =
proc setEvent*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(cint(ev.rfd))
discard posix.close(cint(ev.wfd))
if posix.close(cint(ev.rfd)) == -1:
raiseIOSelectorsError(osLastError())
if posix.close(cint(ev.wfd)) == -1:
raiseIOSelectorsError(osLastError())
deallocShared(cast[pointer](ev))
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
var maxres = MAX_POLL_RESULT_EVENTS
results: var openarray[ReadyKey]): int =
var maxres = MAX_POLL_EVENTS
if maxres > len(results):
maxres = len(results)
@@ -224,10 +230,8 @@ proc selectInto*[T](s: Selector[T], timeout: int,
if count < 0:
result = 0
let err = osLastError()
if err.cint == EINTR:
discard
else:
raiseOSError(osLastError())
if cint(err) != EINTR:
raiseIOSelectorsError(err)
elif count == 0:
result = 0
else:
@@ -238,58 +242,71 @@ proc selectInto*[T](s: Selector[T], timeout: int,
let revents = s.pollfds[i].revents
if revents != 0:
let fd = s.pollfds[i].fd
var skey = addr(s.fds[fd])
skey.key.events = {}
var pkey = addr(s.fds[fd])
var rkey = ReadyKey(fd: int(fd), events: {})
if (revents and POLLIN) != 0:
skey.key.events.incl(Event.Read)
if Event.User in skey.events:
rkey.events.incl(Event.Read)
if Event.User in pkey.events:
var data: uint64 = 0
if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err != OSErrorCode(EAGAIN):
raiseOSError(osLastError())
raiseIOSelectorsError(err)
else:
# someone already consumed event data
inc(i)
continue
skey.key.events = {Event.User}
rkey.events = {Event.User}
if (revents and POLLOUT) != 0:
skey.key.events.incl(Event.Write)
rkey.events.incl(Event.Write)
if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or
(revents and POLLNVAL) != 0:
skey.key.events.incl(Event.Error)
results[rindex] = skey.key
rkey.events.incl(Event.Error)
results[rindex] = rkey
s.pollfds[i].revents = 0
inc(rindex)
inc(k)
inc(i)
result = k
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](MAX_POLL_RESULT_EVENTS)
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
result = newSeq[ReadyKey](MAX_POLL_EVENTS)
let count = selectInto(s, timeout, result)
result.setLen(count)
template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
template withData*[T](s: Selector[T], fd: SocketHandle, value,
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
result = s.fds[fdi].data
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
s.fds[fdi].data = data
result = true
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
var value = addr(s.fds[fdi].data)
body
template withData*[T](s: Selector[T], fd: SocketHandle, value, body1,
template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
body2: untyped) =
mixin checkFd
let fdi = int(fd)
s.checkFd(fdi)
if s.fds[fdi].ident != 0:
var value = addr(s.fds[fdi].key.data)
var value = addr(s.fds[fdi].data)
body1
else:
body2

View File

@@ -120,35 +120,35 @@ when defined(windows):
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) < 0'i32:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
if winlean.listen(ssock, 1) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
var namelen = sizeof(saddr).SockLen
if getsockname(ssock, cast[ptr SockAddr](addr(saddr)),
addr(namelen)) == -1'i32:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
saddr.sin_addr.s_addr = 0x0100007F
if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)),
sizeof(saddr).SockLen) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
namelen = sizeof(saddr).SockLen
rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)),
cast[ptr SockLen](addr(namelen)))
if rsock == SocketHandle(-1):
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
if winlean.closesocket(ssock) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
var mode = clong(1)
if ioctlsocket(rsock, FIONBIO, addr(mode)) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
mode = clong(1)
if ioctlsocket(wsock, FIONBIO, addr(mode)) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rsock = rsock
@@ -158,7 +158,7 @@ when defined(windows):
var data: uint64 = 1
if winlean.send(ev.wsock, cast[pointer](addr data),
cint(sizeof(uint64)), 0) != sizeof(uint64):
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
discard winlean.closesocket(ev.rsock)
@@ -169,7 +169,7 @@ else:
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) == -1:
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
@@ -179,14 +179,17 @@ else:
proc setEvent*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
proc close*(ev: SelectEvent) =
discard posix.close(cint(ev.rsock))
discard posix.close(cint(ev.wsock))
if posix.close(cint(ev.rsock)) == -1:
raiseIOSelectorsError(osLastError())
if posix.close(cint(ev.wsock)) == -1:
raiseIOSelectorsError(osLastError())
deallocShared(cast[pointer](ev))
proc setKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) =
proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
data: T) =
var i = 0
let fdi = int(fd)
while i < FD_SETSIZE:
@@ -194,13 +197,11 @@ proc setKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) =
var pkey = addr(s.fds[i])
pkey.ident = fdi
pkey.events = events
pkey.key.fd = fd.int
pkey.key.events = {}
pkey.key.data = data
pkey.data = data
break
inc(i)
if i == FD_SETSIZE:
raise newException(ValueError, "Maximum numbers of fds exceeded")
raiseIOSelectorsError("Maximum numbers of fds exceeded")
proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
var i = 0
@@ -210,24 +211,28 @@ proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
result = addr(s.fds[i])
break
inc(i)
doAssert(i < FD_SETSIZE, "Descriptor not registered in queue")
if i == FD_SETSIZE:
raiseIOSelectorsError("Descriptor not registered in queue")
proc delKey[T](s: Selector[T], fd: SocketHandle) =
var empty: T
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fd.int:
s.fds[i].ident = 0
s.fds[i].events = {}
s.fds[i].data = empty
break
inc(i)
doAssert(i < FD_SETSIZE, "Descriptor not registered in queue")
if i == FD_SETSIZE:
raiseIOSelectorsError("Descriptor not registered in queue")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
when not defined(windows):
let fdi = int(fd)
s.withSelectLock():
s.setKey(fd, events, data)
s.setSelectKey(fd, events, data)
when not defined(windows):
if fdi > s.maxFD: s.maxFD = fdi
if Event.Read in events:
@@ -242,7 +247,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
when not defined(windows):
let fdi = int(ev.rsock)
s.withSelectLock():
s.setKey(ev.rsock, {Event.User}, data)
s.setSelectKey(ev.rsock, {Event.User}, data)
when not defined(windows):
if fdi > s.maxFD: s.maxFD = fdi
IOFD_SET(ev.rsock, addr s.rSet)
@@ -292,7 +297,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
s.delKey(fd)
proc selectInto*[T](s: Selector[T], timeout: int,
results: var openarray[ReadyKey[T]]): int =
results: var openarray[ReadyKey]): int =
var tv = Timeval()
var ptv = addr tv
var rset, wset, eset: FdSet
@@ -313,11 +318,11 @@ proc selectInto*[T](s: Selector[T], timeout: int,
if count < 0:
result = 0
when defined(windows):
raiseOSError(osLastError())
raiseIOSelectorsError(osLastError())
else:
let err = osLastError()
if cint(err) != EINTR:
raiseOSError(err)
raiseIOSelectorsError(err)
elif count == 0:
result = 0
else:
@@ -329,7 +334,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
if s.fds[i].ident != 0:
var flag = false
var pkey = addr(s.fds[i])
pkey.key.events = {}
var rkey = ReadyKey(fd: int(pkey.ident), events: {})
let fd = SocketHandle(pkey.ident)
if IOFD_ISSET(fd, addr rset) != 0:
if Event.User in pkey.events:
@@ -338,31 +343,31 @@ proc selectInto*[T](s: Selector[T], timeout: int,
sizeof(uint64).cint, 0) != sizeof(uint64):
let err = osLastError()
if cint(err) != EAGAIN:
raiseOSError(err)
raiseIOSelectorsError(err)
else:
inc(i)
inc(k)
continue
else:
flag = true
pkey.key.events = {Event.User}
rkey.events = {Event.User}
else:
flag = true
pkey.key.events = {Event.Read}
rkey.events = {Event.Read}
if IOFD_ISSET(fd, addr wset) != 0:
pkey.key.events.incl(Event.Write)
rkey.events.incl(Event.Write)
if IOFD_ISSET(fd, addr eset) != 0:
pkey.key.events.incl(Event.Error)
rkey.events.incl(Event.Error)
flag = true
if flag:
results[rindex] = pkey.key
results[rindex] = rkey
inc(rindex)
inc(k)
inc(i)
result = rindex
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] =
result = newSeq[ReadyKey[T]](FD_SETSIZE)
proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
result = newSeq[ReadyKey](FD_SETSIZE)
var count = selectInto(s, timeout, result)
result.setLen(count)
@@ -383,7 +388,28 @@ else:
template withSelectLock[T](s: Selector[T], body: untyped) =
body
template withData*[T](s: Selector[T], fd: SocketHandle, value,
proc getData*[T](s: Selector[T], fd: SocketHandle|int): T =
s.withSelectLock():
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
result = s.fds[i].data
break
inc(i)
proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
s.withSelectLock():
let fdi = int(fd)
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
var pkey = addr(s.fds[i])
pkey.data = data
result = true
break
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body: untyped) =
mixin withSelectLock
s.withSelectLock():
@@ -392,13 +418,13 @@ template withData*[T](s: Selector[T], fd: SocketHandle, value,
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
value = addr(s.fds[i].key.data)
value = addr(s.fds[i].data)
break
inc(i)
if i != FD_SETSIZE:
body
template withData*[T](s: Selector[T], fd: SocketHandle, value,
template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
body1, body2: untyped) =
mixin withSelectLock
s.withSelectLock():
@@ -407,10 +433,11 @@ template withData*[T](s: Selector[T], fd: SocketHandle, value,
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fdi:
value = addr(s.fds[i].key.data)
value = addr(s.fds[i].data)
break
inc(i)
if i != FD_SETSIZE:
body1
else:
body2

View File

@@ -526,10 +526,10 @@ when defined(windows) or defined(nimdoc):
proc send*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
## data has been sent.
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
## to avoid early freeing of the buffer
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
## will complete once all data has been sent.
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
verifyPresence(socket)
var retFuture = newFuture[void]("send")
@@ -946,7 +946,8 @@ when defined(windows) or defined(nimdoc):
## receiving notifies.
registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) =
template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
handleCallback) =
let handleFD = AsyncFD(hEvent)
pcd.ioPort = p.ioPort
pcd.handleFd = handleFD
@@ -961,7 +962,7 @@ when defined(windows) or defined(nimdoc):
pcd.ovl = ol
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](pcd), timeout.Dword, flags):
cast[pointer](pcd), timeout.Dword, flags):
GC_unref(ol)
deallocShared(cast[pointer](pcd))
discard closeHandle(hEvent)
@@ -1098,15 +1099,18 @@ else:
import ioselectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
const
InitCallbackListSize = 4 # initial size of callbacks sequence,
# associated with file/socket descriptor.
InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
# queue.
type
AsyncFD* = distinct cint
Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
DoublyLinkedListRef = ref DoublyLinkedList[Callback]
AsyncData = object
readCBs: DoublyLinkedListRef
writeCBs: DoublyLinkedListRef
readList: seq[Callback]
writeList: seq[Callback]
AsyncEvent* = distinct SelectEvent
@@ -1117,11 +1121,17 @@ else:
proc `==`*(x, y: AsyncFD): bool {.borrow.}
proc `==`*(x, y: AsyncEvent): bool {.borrow.}
template newAsyncData(): AsyncData =
AsyncData(
readList: newSeqOfCap[Callback](InitCallbackListSize),
writeList: newSeqOfCap[Callback](InitCallbackListSize)
)
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector[AsyncData]()
result.timers.newHeapQueue()
result.callbacks = initDeque[proc ()](64)
result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
@@ -1130,10 +1140,7 @@ else:
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()
var data = AsyncData(
readCBs: DoublyLinkedListRef(),
writeCBs: DoublyLinkedListRef()
)
var data = newAsyncData()
p.selector.registerHandle(fd.SocketHandle, {}, data)
proc newAsyncNativeSocket*(domain: cint, sockType: cint,
@@ -1168,10 +1175,9 @@ else:
let p = getGlobalDispatcher()
var newEvents = {Event.Read}
withData(p.selector, fd.SocketHandle, adata) do:
adata.readCBs[].append(cb)
adata.readList.add(cb)
newEvents.incl(Event.Read)
if not isNil(adata.writeCBs.head):
newEvents.incl(Event.Write)
if len(adata.writeList) != 0: newEvents.incl(Event.Write)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(fd.SocketHandle, newEvents)
@@ -1180,10 +1186,9 @@ else:
let p = getGlobalDispatcher()
var newEvents = {Event.Write}
withData(p.selector, fd.SocketHandle, adata) do:
adata.writeCBs[].append(cb)
adata.writeList.add(cb)
newEvents.incl(Event.Write)
if not isNil(adata.readCBs.head):
newEvents.incl(Event.Read)
if len(adata.readList) != 0: newEvents.incl(Event.Read)
do:
raise newException(ValueError, "File descriptor not registered.")
p.selector.updateHandle(fd.SocketHandle, newEvents)
@@ -1192,13 +1197,65 @@ else:
let p = getGlobalDispatcher()
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
template processBasicCallbacks(ident, rwlist: untyped) =
# Process pending descriptor's callbacks.
# Invoke every callback stored in `rwlist`, until first one
# returned `false`, which means callback wants to stay
# alive. In such case all remaining callbacks will be added
# to `rwlist` again, in the order they have been inserted.
#
# `rwlist` associated with file descriptor MUST BE emptied before
# dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
# or it can be possible to fall into endless cycle.
var curList: seq[Callback]
withData(p.selector, ident, adata) do:
shallowCopy(curList, adata.rwlist)
adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize)
let newLength = max(len(curList), InitCallbackListSize)
var newList = newSeqOfCap[Callback](newLength)
for cb in curList:
if len(newList) > 0:
newList.add(cb)
else:
if not cb(fd.AsyncFD):
newList.add(cb)
withData(p.selector, ident, adata) do:
adata.rwlist = newList & adata.rwlist
template processCustomCallbacks(ident: untyped) =
# Process pending custom event callbacks. Custom events are
# {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
# There can be only one callback registered with one descriptor,
# so there no need to iterate over list.
var curList: seq[Callback]
withData(p.selector, ident, adata) do:
shallowCopy(curList, adata.readList)
adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
let newLength = len(curList)
var newList = newSeqOfCap[Callback](newLength)
var cb = curList[0]
if not cb(fd.AsyncFD):
newList.add(cb)
else:
p.selector.unregister(fd)
withData(p.selector, ident, adata) do:
adata.readList = newList & adata.readList
proc poll*(timeout = 500) =
var keys: array[64, ReadyKey[AsyncData]]
var keys: array[64, ReadyKey]
let p = getGlobalDispatcher()
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode, Event.User}
Event.Vnode}
if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
raise newException(ValueError,
@@ -1209,45 +1266,23 @@ else:
var i = 0
while i < count:
var custom = false
var fd = keys[i].fd.SocketHandle
let fd = keys[i].fd
let events = keys[i].events
if Event.Read in events or events == {Event.Error}:
for node in keys[i].data.readCBs[].nodes():
let cb = node.value
if cb != nil:
if cb(fd.AsyncFD):
keys[i].data.readCBs[].remove(node)
else:
break
processBasicCallbacks(fd, readList)
if Event.Write in events or events == {Event.Error}:
for node in keys[i].data.writeCBs[].nodes():
let cb = node.value
if cb != nil:
if cb(fd.AsyncFD):
keys[i].data.writeCBs[].remove(node)
else:
break
processBasicCallbacks(fd, writeList)
if Event.User in events or events == {Event.Error}:
custom = true
processBasicCallbacks(fd, readList)
when ioselSupportedPlatform:
if (customSet * events) != {}:
for node in keys[i].data.readCBs[].nodes():
let cb = node.value
doAssert(cb != nil)
custom = true
if cb(fd.AsyncFD):
keys[i].data.readCBs[].remove(node)
p.selector.unregister(fd)
else:
if Event.User in events or events == {Event.Error}:
for node in keys[i].data.readCBs[].nodes():
let cb = node.value
custom = true
if cb != nil:
if cb(fd.AsyncFD):
keys[i].data.readCBs[].remove(node)
p.selector.unregister(fd)
custom = true
processCustomCallbacks(fd)
# because state `data` can be modified in callback we need to update
# descriptor events with currently registered callbacks.
@@ -1255,11 +1290,11 @@ else:
var update = false
var newEvents: set[Event] = {}
p.selector.withData(fd, adata) do:
if not isNil(adata.readCBs.head): incl(newEvents, Event.Read)
if not isNil(adata.writeCBs.head): incl(newEvents, Event.Write)
if len(adata.readList) > 0: incl(newEvents, Event.Read)
if len(adata.writeList) > 0: incl(newEvents, Event.Write)
update = true
if update:
p.selector.updateHandle(fd, newEvents)
p.selector.updateHandle(SocketHandle(fd), newEvents)
inc(i)
# Timer processing.
@@ -1519,33 +1554,24 @@ else:
## ``oneshot`` - if ``true`` only one event will be dispatched,
## if ``false`` continuous events every ``timeout`` milliseconds.
let p = getGlobalDispatcher()
var data = AsyncData(
readCBs: DoublyLinkedListRef(),
writeCBs: DoublyLinkedListRef()
)
data.readCBs[].append(cb)
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerTimer(timeout, oneshot, data)
proc addSignal*(signal: int, cb: Callback) =
## Start watching signal ``signal``, and when signal appears, call the
## callback ``cb``.
let p = getGlobalDispatcher()
var data = AsyncData(
readCBs: DoublyLinkedListRef(),
writeCBs: DoublyLinkedListRef()
)
data.readCBs[].append(cb)
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerSignal(signal, data)
proc addProcess*(pid: int, cb: Callback) =
## Start watching for process exit with pid ``pid``, and then call
## the callback ``cb``.
let p = getGlobalDispatcher()
var data = AsyncData(
readCBs: DoublyLinkedListRef(),
writeCBs: DoublyLinkedListRef()
)
data.readCBs[].append(cb)
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerProcess(pid, data)
proc newAsyncEvent*(): AsyncEvent =
@@ -1564,11 +1590,8 @@ else:
## Start watching for event ``ev``, and call callback ``cb``, when
## ev will be set to signaled state.
let p = getGlobalDispatcher()
var data = AsyncData(
readCBs: DoublyLinkedListRef(),
writeCBs: DoublyLinkedListRef()
)
data.readCBs[].append(cb)
var data = newAsyncData()
data.readList.add(cb)
p.selector.registerEvent(SelectEvent(ev), data)
proc sleepAsync*(ms: int): Future[void] =

View File

@@ -37,9 +37,6 @@ when not defined(windows):
var client_socket = create_test_socket()
var server_socket = create_test_socket()
registerHandle(selector, server_socket, {Event.Read}, 0)
registerHandle(selector, client_socket, {Event.Write}, 0)
var option : int32 = 1
if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
addr(option), sizeof(option).SockLen) < 0:
@@ -50,16 +47,19 @@ when not defined(windows):
aiList.ai_addrlen.Socklen) < 0'i32:
dealloc(aiList)
raiseOSError(osLastError())
discard server_socket.listen()
if server_socket.listen() == -1:
raiseOSError(osLastError())
dealloc(aiList)
aiList = getAddrInfo("127.0.0.1", Port(13337))
discard posix.connect(client_socket, aiList.ai_addr,
aiList.ai_addrlen.Socklen)
registerHandle(selector, server_socket, {Event.Read}, 0)
registerHandle(selector, client_socket, {Event.Write}, 0)
dealloc(aiList)
discard selector.select(100)
var rc1 = selector.select(100)
assert(len(rc1) == 2)
var sockAddress: SockAddr
var addrLen = sizeof(sockAddress).Socklen
@@ -126,15 +126,15 @@ when not defined(windows):
var selector = newSelector[int]()
var event = newSelectEvent()
selector.registerEvent(event, 1)
selector.flush()
var rc0 = selector.select(0)
event.setEvent()
var rc1 = selector.select(0)
event.setEvent()
var rc2 = selector.select(0)
var rc3 = selector.select(0)
assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
var ev1 = rc1[0].data
var ev2 = rc2[0].data
assert(len(rc0) == 0 and len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
var ev1 = selector.getData(rc1[0].fd)
var ev2 = selector.getData(rc2[0].fd)
assert(ev1 == 1 and ev2 == 1)
selector.unregister(event)
event.close()
@@ -150,11 +150,11 @@ when not defined(windows):
var rc2 = selector.select(140)
assert(len(rc1) == 1 and len(rc2) == 1)
selector.unregister(timer)
selector.flush()
discard selector.select(0)
selector.registerTimer(100, true, 0)
var rc3 = selector.select(120)
var rc4 = selector.select(120)
assert(len(rc3) == 1 and len(rc4) == 0)
var rc5 = selector.select(120)
assert(len(rc4) == 1 and len(rc5) == 0)
assert(selector.isEmpty())
selector.close()
result = true
@@ -193,12 +193,14 @@ when not defined(windows):
var s1 = selector.registerSignal(SIGUSR1, 1)
var s2 = selector.registerSignal(SIGUSR2, 2)
var s3 = selector.registerSignal(SIGTERM, 3)
selector.flush()
discard selector.select(0)
discard posix.kill(pid, SIGUSR1)
discard posix.kill(pid, SIGUSR2)
discard posix.kill(pid, SIGTERM)
var rc = selector.select(0)
var cd0 = selector.getData(rc[0].fd)
var cd1 = selector.getData(rc[1].fd)
var cd2 = selector.getData(rc[2].fd)
selector.unregister(s1)
selector.unregister(s2)
selector.unregister(s3)
@@ -211,7 +213,7 @@ when not defined(windows):
raiseOSError(osLastError())
assert(len(rc) == 3)
assert(rc[0].data + rc[1].data + rc[2].data == 6) # 1 + 2 + 3
assert(cd0 + cd1 + cd2 == 6, $(cd0 + cd1 + cd2)) # 1 + 2 + 3
assert(equalMem(addr sigset1o, addr sigset2o, sizeof(Sigset)))
assert(selector.isEmpty())
result = true
@@ -286,8 +288,8 @@ when not defined(windows):
events: set[Event]
proc vnode_test(): bool =
proc validate[T](test: openarray[ReadyKey[T]],
check: openarray[valType]): bool =
proc validate(test: openarray[ReadyKey],
check: openarray[valType]): bool =
result = false
if len(test) == len(check):
for checkItem in check:
@@ -300,7 +302,7 @@ when not defined(windows):
if not result:
break
var res: seq[ReadyKey[int]]
var res: seq[ReadyKey]
var selector = newSelector[int]()
var events = {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend,
Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename,
@@ -315,7 +317,7 @@ when not defined(windows):
raiseOsError(osLastError())
selector.registerVnode(dirfd, events, 1)
selector.flush()
discard selector.select(0)
# chmod testDirectory to 0777
chmodPath(testDirectory, 0x1FF)
@@ -337,7 +339,6 @@ when not defined(windows):
# open test directory for watching
var testfd = openWatch(testDirectory & "/test")
selector.registerVnode(testfd, events, 2)
selector.flush()
doAssert(len(selector.select(0)) == 0)
# rename test directory
@@ -381,7 +382,7 @@ when not defined(windows):
testfd = openWatch(testDirectory & "/testfile")
selector.registerVnode(testfd, events, 1)
selector.flush()
discard selector.select(0)
# write data to test file
writeFile(testDirectory & "/testfile", "TESTDATA")
@@ -433,7 +434,6 @@ when not defined(windows):
proc event_wait_thread(event: SelectEvent) {.thread.} =
var selector = newSelector[int]()
selector.registerEvent(event, 1)
selector.flush()
var rc = selector.select(1000)
if len(rc) == 1:
inc(counter)
@@ -573,15 +573,15 @@ else:
var selector = newSelector[int]()
var event = newSelectEvent()
selector.registerEvent(event, 1)
selector.flush()
discard selector.select(0)
event.setEvent()
var rc1 = selector.select(0)
event.setEvent()
var rc2 = selector.select(0)
var rc3 = selector.select(0)
assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
var ev1 = rc1[0].data
var ev2 = rc2[0].data
var ev1 = selector.getData(rc1[0].fd)
var ev2 = selector.getData(rc2[0].fd)
assert(ev1 == 1 and ev2 == 1)
selector.unregister(event)
event.close()
@@ -595,7 +595,6 @@ else:
proc event_wait_thread(event: SelectEvent) {.thread.} =
var selector = newSelector[int]()
selector.registerEvent(event, 1)
selector.flush()
var rc = selector.select(500)
if len(rc) == 1:
inc(counter)

View File

@@ -1,5 +1,4 @@
discard """
cmd: "nim c -r -f $file"
output: '''
OK
OK