Merge pull request #8428 from yglukhov/async-stdin

Allow async stdin
This commit is contained in:
Dominik Picheta
2018-07-26 16:38:31 +01:00
committed by GitHub
5 changed files with 51 additions and 48 deletions

View File

@@ -92,6 +92,9 @@ proc newSelector*[T](): Selector[T] =
result.maxFD = maxFD
result.fds = newSeq[SelectorKey[T]](maxFD)
for i in 0 ..< maxFD:
result.fds[i].ident = InvalidIdent
proc close*[T](s: Selector[T]) =
let res = posix.close(s.epollFD)
when hasThreadSupport:
@@ -100,12 +103,6 @@ proc close*[T](s: Selector[T]) =
if res != 0:
raiseIOSelectorsError(osLastError())
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = 0
key.events = {}
key.data = empty
proc newSelectEvent*(): SelectEvent =
let fdci = eventfd(0, 0)
if fdci == -1:
@@ -135,7 +132,7 @@ proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0, "Descriptor $# already registered" % $fdi)
doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi)
s.setKey(fdi, events, 0, data)
if events != {}:
var epv = EpollEvent(events: EPOLLRDHUP)
@@ -152,7 +149,7 @@ proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
doAssert(pkey.ident != InvalidIdent,
"Descriptor $# is not registered in the selector!" % $fdi)
doAssert(pkey.events * maskEvents == {})
if pkey.events != events:
@@ -180,7 +177,7 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
doAssert(pkey.ident != InvalidIdent,
"Descriptor $# is not registered in the selector!" % $fdi)
if pkey.events != {}:
when not defined(android):
@@ -243,7 +240,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.efd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
var epv = EpollEvent()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
@@ -262,7 +259,7 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
var events = {Event.Timer}
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
@@ -307,7 +304,7 @@ when not defined(android):
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
@@ -334,7 +331,7 @@ when not defined(android):
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
@@ -347,7 +344,7 @@ when not defined(android):
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = int(ev.efd)
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
s.setKey(fdi, {Event.User}, 0, data)
var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = ev.efd.uint
@@ -381,7 +378,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
let fdi = int(resTable[i].data.u64)
let pevents = resTable[i].events
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
doAssert(pkey.ident != InvalidIdent)
var rkey = ReadyKey(fd: fdi, events: {})
if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
@@ -482,7 +479,7 @@ template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd.int].ident != 0
return s.fds[fd.int].ident != InvalidIdent
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)

View File

@@ -114,6 +114,9 @@ proc newSelector*[T](): Selector[T] =
result.fds = newSeq[SelectorKey[T]](maxFD)
result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS)
for i in 0 ..< MAX_KQUEUE_EVENTS:
result.fds[i].ident = InvalidIdent
result.sock = usock
result.kqFD = kqFD
result.maxFD = maxFD.int
@@ -128,12 +131,6 @@ proc close*[T](s: Selector[T]) =
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = 0
key.events = {}
key.data = empty
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) != 0:
@@ -221,7 +218,7 @@ proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
s.setKey(fdi, events, 0, data)
if events != {}:
@@ -242,7 +239,7 @@ proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
doAssert(pkey.ident != InvalidIdent,
"Descriptor $# is not registered in the queue!" % $fdi)
doAssert(pkey.events * maskEvents == {})
@@ -269,7 +266,7 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
data: T): int {.discardable.} =
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer}
let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD
@@ -291,7 +288,7 @@ proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
s.setKey(fdi, {Event.Signal}, signal, data)
var nmask, omask: Sigset
@@ -315,7 +312,7 @@ proc registerProcess*[T](s: Selector[T], pid: int,
data: T): int {.discardable.} =
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
var kflags: cushort = EV_ONESHOT or EV_ADD
setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data)
@@ -331,7 +328,7 @@ proc registerProcess*[T](s: Selector[T], pid: int,
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = ev.rfd.int
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
setKey(s, fdi, {Event.User}, 0, data)
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
@@ -374,7 +371,7 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
doAssert(pkey.ident != InvalidIdent,
"Descriptor [" & $fdi & "] is not registered in the queue!")
if pkey.events != {}:
@@ -434,7 +431,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
@@ -593,7 +590,7 @@ template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd.int].ident != 0
return s.fds[fd.int].ident != InvalidIdent
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)

View File

@@ -70,6 +70,9 @@ proc newSelector*[T](): Selector[T] =
result.fds = newSeq[SelectorKey[T]](maxFD)
result.pollfds = newSeq[TPollFd](maxFD)
for i in 0 ..< maxFD:
result.fds[i].ident = InvalidIdent
proc close*[T](s: Selector[T]) =
when hasThreadSupport:
deinitLock(s.lock)
@@ -77,12 +80,6 @@ proc close*[T](s: Selector[T]) =
deallocSharedArray(s.pollfds)
deallocShared(cast[pointer](s))
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = 0
key.events = {}
key.data = empty
template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) =
withPollLock(s):
var pollev: cshort = 0
@@ -135,7 +132,7 @@ proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
var fdi = int(fd)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
doAssert(s.fds[fdi].ident == InvalidIdent)
setKey(s, fdi, events, 0, data)
if events != {}: s.pollAdd(fdi.cint, events)
@@ -146,7 +143,7 @@ proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
doAssert(pkey.ident != InvalidIdent,
"Descriptor [" & $fdi & "] is not registered in the queue!")
doAssert(pkey.events * maskEvents == {})
@@ -162,7 +159,7 @@ proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
var fdi = int(ev.rfd)
doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
var events = {Event.User}
setKey(s, fdi, events, 0, data)
events.incl(Event.Read)
@@ -172,9 +169,9 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
let fdi = int(fd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0,
doAssert(pkey.ident != InvalidIdent,
"Descriptor [" & $fdi & "] is not registered in the queue!")
pkey.ident = 0
pkey.ident = InvalidIdent
pkey.events = {}
s.pollRemove(fdi.cint)
@@ -182,9 +179,9 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0, "Event is not registered in the queue!")
doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
doAssert(Event.User in pkey.events)
pkey.ident = 0
pkey.ident = InvalidIdent
pkey.events = {}
s.pollRemove(fdi.cint)
@@ -270,7 +267,7 @@ template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd.int].ident != 0
return s.fds[fd.int].ident != InvalidIdent
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)

View File

@@ -99,6 +99,9 @@ proc newSelector*[T](): Selector[T] =
result = Selector[T]()
result.fds = newSeq[SelectorKey[T]](FD_SETSIZE)
for i in 0 ..< FD_SETSIZE:
result.fds[i].ident = InvalidIdent
IOFD_ZERO(addr result.rSet)
IOFD_ZERO(addr result.wSet)
IOFD_ZERO(addr result.eSet)
@@ -195,7 +198,7 @@ proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
var i = 0
let fdi = int(fd)
while i < FD_SETSIZE:
if s.fds[i].ident == 0:
if s.fds[i].ident == InvalidIdent:
var pkey = addr(s.fds[i])
pkey.ident = fdi
pkey.events = events
@@ -221,7 +224,7 @@ proc delKey[T](s: Selector[T], fd: SocketHandle) =
var i = 0
while i < FD_SETSIZE:
if s.fds[i].ident == fd.int:
s.fds[i].ident = 0
s.fds[i].ident = InvalidIdent
s.fds[i].events = {}
s.fds[i].data = empty
break
@@ -335,7 +338,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
var k = 0
while (i < FD_SETSIZE) and (k < count):
if s.fds[i].ident != 0:
if s.fds[i].ident != InvalidIdent:
var flag = false
var pkey = addr(s.fds[i])
var rkey = ReadyKey(fd: int(pkey.ident), events: {})

View File

@@ -261,6 +261,9 @@ else:
param: int
data: T
const
InvalidIdent = -1
proc raiseIOSelectorsError[T](message: T) =
var msg = ""
when T is string:
@@ -302,6 +305,12 @@ else:
if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1:
raiseIOSelectorsError(osLastError())
template clearKey[T](key: ptr SelectorKey[T]) =
var empty: T
key.ident = InvalidIdent
key.events = {}
key.data = empty
when defined(linux):
include ioselects/ioselectors_epoll
elif bsdPlatform: