mirror of
https://github.com/nim-lang/Nim.git
synced 2026-02-20 09:58:35 +00:00
Multiple improvements to selectors.
* Added ``getFd`` procedure for retrieving the underlying selector's FD. * Selectors module's procedures now accept an ``int`` as well as a ``SocketHandle``. * ReadyKey now contains the error code for Event.Error events.
This commit is contained in:
@@ -16,6 +16,8 @@
|
||||
|
||||
### Library changes
|
||||
|
||||
- The `ReadyKey` type in the selectors module now contains an ``errorCode``
|
||||
field to help distinguish between ``Event.Error`` events.
|
||||
- The `AsyncFD` type now reflects the fact that the underlying FD is registered
|
||||
in the async dispatcher.
|
||||
- The overloading rules changed slightly so that constrained generics are
|
||||
|
||||
@@ -141,7 +141,7 @@ template checkFd(s, f) =
|
||||
if f >= s.maxFD:
|
||||
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
|
||||
|
||||
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event], data: T) =
|
||||
let fdi = int(fd)
|
||||
s.checkFd(fdi)
|
||||
@@ -156,7 +156,7 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
raiseIOSelectorsError(osLastError())
|
||||
inc(s.count)
|
||||
|
||||
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
|
||||
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
|
||||
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
|
||||
Event.User, Event.Oneshot, Event.Error}
|
||||
let fdi = int(fd)
|
||||
@@ -391,9 +391,19 @@ proc selectInto*[T](s: Selector[T], timeout: int,
|
||||
let pevents = resTable[i].events
|
||||
var pkey = addr(s.fds[fdi])
|
||||
doAssert(pkey.ident != 0)
|
||||
var rkey = ReadyKey(fd: int(fdi), events: {})
|
||||
var rkey = ReadyKey(fd: fdi, events: {})
|
||||
|
||||
if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
|
||||
if (pevents and EPOLLHUP) != 0:
|
||||
rkey.errorCode = ECONNRESET.OSErrorCode
|
||||
else:
|
||||
# Try reading SO_ERROR from fd.
|
||||
var error: cint
|
||||
var size = sizeof(error).SockLen
|
||||
if getsockopt(fdi.SocketHandle, SOL_SOCKET, SO_ERROR, addr(error),
|
||||
addr(size)) == 0'i32:
|
||||
rkey.errorCode = error.OSErrorCode
|
||||
|
||||
rkey.events.incl(Event.Error)
|
||||
if (pevents and EPOLLOUT) != 0:
|
||||
rkey.events.incl(Event.Write)
|
||||
@@ -481,7 +491,7 @@ template isEmpty*[T](s: Selector[T]): bool =
|
||||
(s.count == 0)
|
||||
|
||||
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
|
||||
return s.fds[fd].ident != 0
|
||||
return s.fds[fd.int].ident != 0
|
||||
|
||||
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
|
||||
let fdi = int(fd)
|
||||
@@ -515,3 +525,6 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
|
||||
body1
|
||||
else:
|
||||
body2
|
||||
|
||||
proc getFd*[T](s: Selector[T]): int =
|
||||
return s.epollFd.int
|
||||
@@ -217,7 +217,7 @@ else:
|
||||
raiseIOSelectorsError(osLastError())
|
||||
s.changes.setLen(0)
|
||||
|
||||
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event], data: T) =
|
||||
let fdi = int(fd)
|
||||
s.checkFd(fdi)
|
||||
@@ -235,7 +235,7 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
when not declared(CACHE_EVENTS):
|
||||
flushKQueue(s)
|
||||
|
||||
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event]) =
|
||||
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
|
||||
Event.User, Event.Oneshot, Event.Error}
|
||||
@@ -503,6 +503,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
|
||||
|
||||
if (kevent.flags and EV_ERROR) != 0:
|
||||
rkey.events = {Event.Error}
|
||||
rkey.errorCode = kevent.data.OSErrorCode
|
||||
|
||||
case kevent.filter:
|
||||
of EVFILT_READ:
|
||||
@@ -569,6 +570,13 @@ proc selectInto*[T](s: Selector[T], timeout: int,
|
||||
doAssert(true, "Unsupported kqueue filter in the queue!")
|
||||
|
||||
if (kevent.flags and EV_EOF) != 0:
|
||||
if kevent.fflags != 0:
|
||||
rkey.errorCode = kevent.fflags.OSErrorCode
|
||||
else:
|
||||
# This assumes we are dealing with sockets.
|
||||
# TODO: For future-proofing it might be a good idea to give the
|
||||
# user access to the raw `kevent`.
|
||||
rkey.errorCode = ECONNRESET.OSErrorCode
|
||||
rkey.events.incl(Event.Error)
|
||||
|
||||
results[k] = rkey
|
||||
@@ -585,7 +593,7 @@ template isEmpty*[T](s: Selector[T]): bool =
|
||||
(s.count == 0)
|
||||
|
||||
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
|
||||
return s.fds[fd].ident != 0
|
||||
return s.fds[fd.int].ident != 0
|
||||
|
||||
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
|
||||
let fdi = int(fd)
|
||||
@@ -619,3 +627,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
|
||||
body1
|
||||
else:
|
||||
body2
|
||||
|
||||
|
||||
proc getFd*[T](s: Selector[T]): int =
|
||||
return s.kqFD.int
|
||||
@@ -141,7 +141,7 @@ template checkFd(s, f) =
|
||||
if f >= s.maxFD:
|
||||
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
|
||||
|
||||
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event], data: T) =
|
||||
var fdi = int(fd)
|
||||
s.checkFd(fdi)
|
||||
@@ -149,7 +149,7 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
setKey(s, fdi, events, 0, data)
|
||||
if events != {}: s.pollAdd(fdi.cint, events)
|
||||
|
||||
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event]) =
|
||||
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
|
||||
Event.User, Event.Oneshot, Event.Error}
|
||||
@@ -280,7 +280,7 @@ template isEmpty*[T](s: Selector[T]): bool =
|
||||
(s.count == 0)
|
||||
|
||||
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
|
||||
return s.fds[fd].ident != 0
|
||||
return s.fds[fd.int].ident != 0
|
||||
|
||||
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
|
||||
let fdi = int(fd)
|
||||
@@ -314,3 +314,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
|
||||
body1
|
||||
else:
|
||||
body2
|
||||
|
||||
|
||||
proc getFd*[T](s: Selector[T]): int =
|
||||
return -1
|
||||
@@ -229,7 +229,7 @@ proc delKey[T](s: Selector[T], fd: SocketHandle) =
|
||||
doAssert(i < FD_SETSIZE,
|
||||
"Descriptor [" & $int(fd) & "] is not registered in the queue!")
|
||||
|
||||
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event], data: T) =
|
||||
when not defined(windows):
|
||||
let fdi = int(fd)
|
||||
@@ -255,7 +255,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
|
||||
IOFD_SET(ev.rsock, addr s.rSet)
|
||||
inc(s.count)
|
||||
|
||||
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event]) =
|
||||
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
|
||||
Event.User, Event.Oneshot, Event.Error}
|
||||
@@ -279,7 +279,7 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
|
||||
inc(s.count)
|
||||
pkey.events = events
|
||||
|
||||
proc unregister*[T](s: Selector[T], fd: SocketHandle) =
|
||||
proc unregister*[T](s: Selector[T], fd: int | SocketHandle) =
|
||||
s.withSelectLock():
|
||||
var pkey = s.getKey(fd)
|
||||
if Event.Read in pkey.events:
|
||||
@@ -451,3 +451,6 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
|
||||
else:
|
||||
body2
|
||||
|
||||
|
||||
proc getFd*[T](s: Selector[T]): int =
|
||||
return -1
|
||||
@@ -54,9 +54,9 @@ when defined(nimdoc):
|
||||
Timer, ## Timer descriptor is completed
|
||||
Signal, ## Signal is raised
|
||||
Process, ## Process is finished
|
||||
Vnode, ## BSD specific file change happens
|
||||
Vnode, ## BSD specific file change
|
||||
User, ## User event is raised
|
||||
Error, ## Error happens while waiting, for descriptor
|
||||
Error, ## Error occurred while waiting for descriptor
|
||||
VnodeWrite, ## NOTE_WRITE (BSD specific, write to file occurred)
|
||||
VnodeDelete, ## NOTE_DELETE (BSD specific, unlink of file occurred)
|
||||
VnodeExtend, ## NOTE_EXTEND (BSD specific, file extended)
|
||||
@@ -69,6 +69,8 @@ when defined(nimdoc):
|
||||
## An object which holds result for descriptor
|
||||
fd* : int ## file/socket descriptor
|
||||
events*: set[Event] ## set of events
|
||||
errorCode*: OSErrorCode ## additional error code information for
|
||||
## Error events
|
||||
|
||||
SelectEvent* = object
|
||||
## An object which holds user defined event
|
||||
@@ -79,13 +81,14 @@ when defined(nimdoc):
|
||||
proc close*[T](s: Selector[T]) =
|
||||
## Closes the selector.
|
||||
|
||||
proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event],
|
||||
data: T) =
|
||||
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event], data: T) =
|
||||
## Registers file/socket descriptor ``fd`` to selector ``s``
|
||||
## with events set in ``events``. The ``data`` is application-defined
|
||||
## data, which will be passed when an event is triggered.
|
||||
|
||||
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
|
||||
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
|
||||
events: set[Event]) =
|
||||
## Update file/socket descriptor ``fd``, registered in selector
|
||||
## ``s`` with new events set ``event``.
|
||||
|
||||
@@ -221,11 +224,15 @@ when defined(nimdoc):
|
||||
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
|
||||
## Determines whether selector contains a file descriptor.
|
||||
|
||||
proc getFd*[T](s: Selector[T]): int =
|
||||
## Retrieves the underlying selector's file descriptor.
|
||||
##
|
||||
## For *poll* and *select* selectors ``-1`` is returned.
|
||||
|
||||
else:
|
||||
when hasThreadSupport:
|
||||
import locks
|
||||
|
||||
|
||||
type
|
||||
SharedArray[T] = UncheckedArray[T]
|
||||
|
||||
@@ -234,7 +241,6 @@ else:
|
||||
|
||||
proc deallocSharedArray[T](sa: ptr SharedArray[T]) =
|
||||
deallocShared(cast[pointer](sa))
|
||||
|
||||
type
|
||||
Event* {.pure.} = enum
|
||||
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot,
|
||||
@@ -247,6 +253,7 @@ else:
|
||||
ReadyKey* = object
|
||||
fd* : int
|
||||
events*: set[Event]
|
||||
errorCode*: OSErrorCode
|
||||
|
||||
SelectorKey[T] = object
|
||||
ident: int
|
||||
@@ -264,7 +271,7 @@ else:
|
||||
msg.add("Internal Error\n")
|
||||
var err = newException(IOSelectorsException, msg)
|
||||
raise err
|
||||
|
||||
|
||||
proc setNonBlocking(fd: cint) {.inline.} =
|
||||
setBlocking(fd.SocketHandle, false)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user