Fixes for upcoming asyncdispatch and ioselectors. (#5309)

This commit is contained in:
Eugene Kabanov
2017-02-01 13:12:26 +02:00
committed by Andreas Rumpf
parent 3c773c189f
commit d90f3f59ac
6 changed files with 113 additions and 70 deletions

View File

@@ -165,7 +165,7 @@ proc close*(ev: SelectEvent) =
template checkFd(s, f) =
if f >= s.maxFD:
raiseIOSelectorsError("Maximum file descriptors exceeded")
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
@@ -188,7 +188,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
var epv = epoll_event(events: EPOLLRDHUP)
@@ -215,8 +216,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
if pkey.events != {}:
when not defined(android):
if pkey.events * {Event.Read, Event.Write} != {}:
@@ -277,7 +278,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.efd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
@@ -380,7 +381,7 @@ when not defined(android):
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = int(ev.efd)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
s.setKey(fdi, {Event.User}, 0, data)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = ev.efd.uint

View File

@@ -119,12 +119,13 @@ proc newSelector*[T](): Selector[T] =
result.maxFD = maxFD.int
proc close*[T](s: Selector[T]) =
let res = posix.close(s.kqFD)
let res1 = posix.close(s.kqFD)
let res2 = posix.close(s.sock)
when hasThreadSupport:
deinitLock(s.changesLock)
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
if res != 0:
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
template clearKey[T](key: ptr SelectorKey[T]) =
@@ -157,7 +158,7 @@ proc close*(ev: SelectEvent) =
template checkFd(s, f) =
if f >= s.maxFD:
raiseIOSelectorsError("Maximum file descriptors exceeded!")
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
when hasThreadSupport:
template withChangeLock[T](s: Selector[T], body: untyped) =
@@ -241,7 +242,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
@@ -329,7 +331,7 @@ proc registerProcess*[T](s: Selector[T], pid: int,
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = ev.rfd.int
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
setKey(s, fdi, {Event.User}, 0, data)
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
@@ -372,7 +374,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
if pkey.events != {}:
if pkey.events * {Event.Read, Event.Write} != {}:
@@ -431,9 +434,8 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
@@ -564,8 +566,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
pkey.events.incl(Event.Finished)
rkey.events.incl(Event.Process)
else:
pkey = addr(s.fds[cast[int](kevent.udata)])
raiseIOSelectorsError("Unsupported kqueue filter in queue!")
doAssert(true, "Unsupported kqueue filter in the queue!")
if (kevent.flags and EV_EOF) != 0:
rkey.events.incl(Event.Error)

View File

@@ -115,9 +115,8 @@ template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
s.pollfds[i].events = pollev
break
inc(i)
if i == s.pollcnt:
raiseIOSelectorsError("Descriptor is not registered in queue")
doAssert(i < s.pollcnt,
"Descriptor [" & $sock & "] is not registered in the queue!")
template pollRemove[T](s: Selector[T], sock: cint) =
withPollLock(s):
@@ -140,7 +139,7 @@ template pollRemove[T](s: Selector[T], sock: cint) =
template checkFd(s, f) =
if f >= s.maxFD:
raiseIOSelectorsError("Descriptor is not registered in queue")
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
@@ -157,7 +156,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
@@ -172,7 +172,7 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
var fdi = int(ev.rfd)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
var events = {Event.User}
setKey(s, fdi, events, 0, data)
events.incl(Event.Read)
@@ -182,7 +182,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0,
"Descriptor [" & $fdi & "] is not registered in the queue!")
pkey.ident = 0
pkey.events = {}
s.pollRemove(fdi.cint)
@@ -191,7 +192,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.events = {}

View File

@@ -202,8 +202,8 @@ proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
pkey.data = data
break
inc(i)
if i == FD_SETSIZE:
raiseIOSelectorsError("Maximum numbers of fds exceeded")
if i >= FD_SETSIZE:
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
var i = 0
@@ -213,8 +213,8 @@ proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
result = addr(s.fds[i])
break
inc(i)
if i == FD_SETSIZE:
raiseIOSelectorsError("Descriptor not registered in queue")
doAssert(i < FD_SETSIZE,
"Descriptor [" & $int(fd) & "] is not registered in the queue!")
proc delKey[T](s: Selector[T], fd: SocketHandle) =
var empty: T
@@ -226,8 +226,8 @@ proc delKey[T](s: Selector[T], fd: SocketHandle) =
s.fds[i].data = empty
break
inc(i)
if i == FD_SETSIZE:
raiseIOSelectorsError("Descriptor not registered in queue")
doAssert(i < FD_SETSIZE,
"Descriptor [" & $int(fd) & "] is not registered in the queue!")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
events: set[Event], data: T) =
@@ -294,6 +294,7 @@ proc unregister*[T](s: Selector[T], fd: SocketHandle) =
proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fd = ev.rsock
s.withSelectLock():
var pkey = s.getKey(fd)
IOFD_CLR(fd, addr s.rSet)
dec(s.count)
s.delKey(fd)

View File

@@ -1056,16 +1056,14 @@ when defined(windows) or defined(nimdoc):
proc unregister*(ev: AsyncEvent) =
## Unregisters event ``ev``.
if ev.hWaiter != 0:
let p = getGlobalDispatcher()
p.handles.excl(AsyncFD(ev.hEvent))
if unregisterWait(ev.hWaiter) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(err)
ev.hWaiter = 0
else:
raise newException(ValueError, "Event is not registered!")
doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
let p = getGlobalDispatcher()
p.handles.excl(AsyncFD(ev.hEvent))
if unregisterWait(ev.hWaiter) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(err)
ev.hWaiter = 0
proc close*(ev: AsyncEvent) =
## Closes event ``ev``.
@@ -1076,8 +1074,7 @@ when defined(windows) or defined(nimdoc):
proc addEvent*(ev: AsyncEvent, cb: Callback) =
## Registers callback ``cb`` to be called when ``ev`` will be signaled
if ev.hWaiter != 0:
raise newException(ValueError, "Event is already registered!")
doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
let p = getGlobalDispatcher()
let hEvent = ev.hEvent
@@ -1086,17 +1083,22 @@ when defined(windows) or defined(nimdoc):
var flags = WT_EXECUTEINWAITTHREAD.Dword
proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if cb(fd):
# we need this check to avoid exception, if `unregister(event)` was
# called in callback.
deallocShared(cast[pointer](pcd))
if ev.hWaiter != 0: unregister(ev)
if ev.hWaiter != 0:
if cb(fd):
# we need this check to avoid exception, if `unregister(event)` was
# called in callback.
deallocShared(cast[pointer](pcd))
if ev.hWaiter != 0:
unregister(ev)
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
# if ev.hWaiter == 0, then event was unregistered before `poll()` call.
deallocShared(cast[pointer](pcd))
registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
ev.hWaiter = pcd.waitFd
@@ -1205,7 +1207,7 @@ else:
not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
template processBasicCallbacks(ident, rwlist: untyped) =
# Process pending descriptor's callbacks.
# Process pending descriptor's and AsyncEvent callbacks.
# Invoke every callback stored in `rwlist`, until first one
# returned `false`, which means callback wants to stay
# alive. In such case all remaining callbacks will be added
@@ -1232,6 +1234,8 @@ else:
withData(p.selector, ident, adata) do:
adata.rwlist = newList & adata.rwlist
rLength = len(adata.readList)
wLength = len(adata.writeList)
template processCustomCallbacks(ident: untyped) =
# Process pending custom event callbacks. Custom events are
@@ -1275,6 +1279,8 @@ else:
var custom = false
let fd = keys[i].fd
let events = keys[i].events
var rLength = 0 # len(data.readList) after callback
var wLength = 0 # len(data.writeList) after callback
if Event.Read in events or events == {Event.Error}:
processBasicCallbacks(fd, readList)
@@ -1283,8 +1289,10 @@ else:
processBasicCallbacks(fd, writeList)
if Event.User in events or events == {Event.Error}:
custom = true
processBasicCallbacks(fd, readList)
custom = true
if rLength == 0:
p.selector.unregister(fd)
when ioselSupportedPlatform:
if (customSet * events) != {}:
@@ -1296,10 +1304,12 @@ else:
if not custom:
var update = false
var newEvents: set[Event] = {}
p.selector.withData(fd, adata) do:
if len(adata.readList) > 0: incl(newEvents, Event.Read)
if len(adata.writeList) > 0: incl(newEvents, Event.Write)
if rLength > 0:
update = true
incl(newEvents, Event.Read)
if wLength > 0:
update = true
incl(newEvents, Event.Write)
if update:
p.selector.updateHandle(SocketHandle(fd), newEvents)
inc(i)

View File

@@ -1,9 +1,6 @@
discard """
output: '''
OK
OK
OK
OK
'''
"""
@@ -31,11 +28,39 @@ when defined(upcoming):
var fut = waitEvent(event)
asyncCheck(delayedSet(event, 500))
waitFor(fut or sleepAsync(1000))
if fut.finished:
echo "OK"
else:
if not fut.finished:
echo "eventTest: Timeout expired before event received!"
proc eventTest5304() =
# Event should not be signaled if it was uregistered,
# even in case, when poll() was not called yet.
# Issue #5304.
var unregistered = false
let e = newAsyncEvent()
addEvent(e) do (fd: AsyncFD) -> bool:
assert(not unregistered)
e.setEvent()
e.unregister()
unregistered = true
poll()
proc eventTest5298() =
# Event must raise `AssertionError` if event was unregistered twice.
# Issue #5298.
let e = newAsyncEvent()
var eventReceived = false
addEvent(e) do (fd: AsyncFD) -> bool:
eventReceived = true
return true
e.setEvent()
while not eventReceived:
poll()
try:
e.unregister()
except AssertionError:
discard
e.close()
when ioselSupportedPlatform or defined(windows):
import osproc
@@ -56,7 +81,6 @@ when defined(upcoming):
proc timerTest() =
waitFor(waitTimer(200))
echo "OK"
proc processTest() =
when defined(windows):
@@ -70,7 +94,7 @@ when defined(upcoming):
var fut = waitProcess(process)
waitFor(fut or waitTimer(2000))
if fut.finished and process.peekExitCode() == 0:
echo "OK"
discard
else:
echo "processTest: Timeout expired before process exited!"
@@ -92,23 +116,28 @@ when defined(upcoming):
var fut = waitSignal(posix.SIGINT)
asyncCheck(delayedSignal(posix.SIGINT, 500))
waitFor(fut or waitTimer(1000))
if fut.finished:
echo "OK"
else:
if not fut.finished:
echo "signalTest: Timeout expired before signal received!"
when ioselSupportedPlatform:
timerTest()
eventTest()
eventTest5304()
eventTest5298()
processTest()
signalTest()
echo "OK"
elif defined(windows):
timerTest()
eventTest()
eventTest5304()
eventTest5298()
processTest()
echo "OK"
else:
eventTest()
echo "OK\nOK\nOK"
eventTest5304()
eventTest5298()
echo "OK"
else:
echo "OK\nOK\nOK\nOK"
echo "OK"