Merge pull request #5096 from cheatfate/sup5094

Support android compilation of ioselectors.
This commit is contained in:
Andreas Rumpf
2016-12-17 10:12:47 +01:00
committed by GitHub
5 changed files with 256 additions and 199 deletions

View File

@@ -18,10 +18,12 @@
## Supported features: files, sockets, pipes, timers, processes, signals
## and user events.
##
## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux.
## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux (except
## for Android).
##
## Partially supported OS: Windows (only sockets and user events),
## Solaris (files, sockets, handles and user events).
## Android (files, sockets, handles and user events).
##
## TODO: ``/dev/poll``, ``event ports`` and filesystem events.
@@ -29,9 +31,11 @@ import os
const hasThreadSupport = compileOption("threads") and defined(threadsafe)
const supportedPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(linux)
const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
(defined(linux) and not defined(android))
## This constant is used to determine whether the destination platform is
## fully supported by ``ioselectors`` module.
const bsdPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd)
@@ -244,7 +248,7 @@ else:
skey.key.fd = pkeyfd
skey.key.data = pdata
when supportedPlatform:
when ioselSupportedPlatform:
template blockSignals(newmask: var Sigset, oldmask: var Sigset) =
when hasThreadSupport:
if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1:

View File

@@ -14,27 +14,29 @@ import posix, times
# Maximum number of events that can be returned
const MAX_EPOLL_RESULT_EVENTS = 64
type
SignalFdInfo* {.importc: "struct signalfd_siginfo",
header: "<sys/signalfd.h>", pure, final.} = object
ssi_signo*: uint32
ssi_errno*: int32
ssi_code*: int32
ssi_pid*: uint32
ssi_uid*: uint32
ssi_fd*: int32
ssi_tid*: uint32
ssi_band*: uint32
ssi_overrun*: uint32
ssi_trapno*: uint32
ssi_status*: int32
ssi_int*: int32
ssi_ptr*: uint64
ssi_utime*: uint64
ssi_stime*: uint64
ssi_addr*: uint64
pad* {.importc: "__pad".}: array[0..47, uint8]
when not defined(android):
type
SignalFdInfo* {.importc: "struct signalfd_siginfo",
header: "<sys/signalfd.h>", pure, final.} = object
ssi_signo*: uint32
ssi_errno*: int32
ssi_code*: int32
ssi_pid*: uint32
ssi_uid*: uint32
ssi_fd*: int32
ssi_tid*: uint32
ssi_band*: uint32
ssi_overrun*: uint32
ssi_trapno*: uint32
ssi_status*: int32
ssi_int*: int32
ssi_ptr*: uint64
ssi_utime*: uint64
ssi_stime*: uint64
ssi_addr*: uint64
pad* {.importc: "__pad".}: array[0..47, uint8]
type
eventFdData {.importc: "eventfd_t",
header: "<sys/eventfd.h>", pure, final.} = uint64
epoll_data {.importc: "union epoll_data", header: "<sys/epoll.h>",
@@ -68,12 +70,22 @@ proc timerfd_create(clock_id: ClockId, flags: cint): cint
proc timerfd_settime(ufd: cint, flags: cint,
utmr: var Itimerspec, otmr: var Itimerspec): cint
{.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
{.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
proc eventfd(count: cuint, flags: cint): cint
{.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
proc ulimit(cmd: cint): clong
{.importc: "ulimit", header: "<ulimit.h>", varargs.}
when not defined(android):
proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
{.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
var RLIMIT_NOFILE {.importc: "RLIMIT_NOFILE",
header: "<sys/resource.h>".}: cint
type
rlimit {.importc: "struct rlimit",
header: "<sys/resource.h>", pure, final.} = object
rlim_cur: int
rlim_max: int
proc getrlimit(resource: cint, rlp: var rlimit): cint
{.importc: "getrlimit",header: "<sys/resource.h>".}
when hasThreadSupport:
type
@@ -97,7 +109,10 @@ type
SelectEvent* = ptr SelectEventImpl
proc newSelector*[T](): Selector[T] =
var maxFD = int(ulimit(4, 0))
var a = rlimit()
if getrlimit(RLIMIT_NOFILE, a) != 0:
raiseOsError(osLastError())
var maxFD = int(a.rlim_max)
doAssert(maxFD > 0)
var epollFD = epoll_create(MAX_EPOLL_RESULT_EVENTS)
@@ -194,39 +209,53 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
doAssert(pkey.ident != 0)
if pkey.events != {}:
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
dec(s.count)
elif Event.Signal in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(s.fds[fdi].param))
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
elif Event.Process in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, SIGCHLD)
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
when not defined(android):
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
dec(s.count)
elif Event.Signal in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(s.fds[fdi].param))
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
elif Event.Process in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
var nmask, omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, SIGCHLD)
unblockSignals(nmask, omask)
discard posix.close(fdi.cint)
dec(s.count)
else:
if pkey.events * {Event.Read, Event.Write} != {}:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
dec(s.count)
elif Event.Timer in pkey.events:
var epv = epoll_event()
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
discard posix.close(fdi.cint)
dec(s.count)
pkey.ident = 0
pkey.events = {}
@@ -280,60 +309,61 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
inc(s.count)
result = fdi
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
var
nmask: Sigset
omask: Sigset
when not defined(android):
proc registerSignal*[T](s: Selector[T], signal: int,
data: T): int {.discardable.} =
var
nmask: Sigset
omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(signal))
blockSignals(nmask, omask)
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, cint(signal))
blockSignals(nmask, omask)
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, signal, {Event.Signal}, signal, data)
inc(s.count)
result = fdi
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, signal, {Event.Signal}, signal, data)
inc(s.count)
result = fdi
proc registerProcess*[T](s: Selector, pid: int,
data: T): int {.discardable.} =
var
nmask: Sigset
omask: Sigset
proc registerProcess*[T](s: Selector, pid: int,
data: T): int {.discardable.} =
var
nmask: Sigset
omask: Sigset
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, posix.SIGCHLD)
blockSignals(nmask, omask)
discard sigemptyset(nmask)
discard sigemptyset(omask)
discard sigaddset(nmask, posix.SIGCHLD)
blockSignals(nmask, omask)
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
let fdi = signalfd(-1, nmask, 0).int
if fdi == -1:
raiseOSError(osLastError())
setNonBlocking(fdi.cint)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == 0)
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
epv.events = EPOLLIN or EPOLLRDHUP
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, pid, {Event.Process, Event.Oneshot}, pid, data)
inc(s.count)
result = fdi
var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
epv.data.u64 = fdi.uint
epv.events = EPOLLIN or EPOLLRDHUP
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1:
raiseOSError(osLastError())
s.setKey(fdi, pid, {Event.Process, Event.Oneshot}, pid, data)
inc(s.count)
result = fdi
proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = int(ev.efd)
@@ -382,40 +412,60 @@ proc selectInto*[T](s: Selector[T], timeout: int,
events.incl(Event.Error)
if (pevents and EPOLLOUT) != 0:
events.incl(Event.Write)
if (pevents and EPOLLIN) != 0:
if Event.Read in skey.events:
events.incl(Event.Read)
elif Event.Timer in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
events = {Event.Timer}
elif Event.Signal in skey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
events = {Event.Signal}
elif Event.Process in skey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
if cast[int](data.ssi_pid) == skey.param:
events = {Event.Process}
else:
inc(i)
continue
elif Event.User in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
when not defined(android):
if (pevents and EPOLLIN) != 0:
if Event.Read in skey.events:
events.incl(Event.Read)
elif Event.Timer in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
events = {Event.Timer}
elif Event.Signal in skey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
events = {Event.Signal}
elif Event.Process in skey.events:
var data = SignalFdInfo()
if posix.read(fdi.cint, addr data,
sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
raiseOsError(osLastError())
if cast[int](data.ssi_pid) == skey.param:
events = {Event.Process}
else:
inc(i)
continue
else:
raiseOSError(err)
events = {Event.User}
elif Event.User in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
inc(i)
continue
else:
raiseOSError(err)
events = {Event.User}
else:
if (pevents and EPOLLIN) != 0:
if Event.Read in skey.events:
events.incl(Event.Read)
elif Event.Timer in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
raiseOSError(osLastError())
events = {Event.Timer}
elif Event.User in skey.events:
var data: uint64 = 0
if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64):
let err = osLastError()
if err == OSErrorCode(EAGAIN):
inc(i)
continue
else:
raiseOSError(err)
events = {Event.User}
skey.key.events = events
results[k] = skey.key

View File

@@ -1092,11 +1092,6 @@ else:
import ioselectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
MSG_NOSIGNAL
const supportedPlatform = defined(linux) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(macosx)
type
AsyncFD* = distinct cint
Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
@@ -1191,7 +1186,7 @@ else:
var keys: array[64, ReadyKey[AsyncData]]
let p = getGlobalDispatcher()
when supportedPlatform:
when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode, Event.User}
@@ -1225,7 +1220,7 @@ else:
else:
break
when supportedPlatform:
when ioselSupportedPlatform:
if (customSet * events) != {}:
for node in keys[i].data.readCBs[].nodes():
let cb = node.value
@@ -1234,6 +1229,15 @@ else:
if cb(fd.AsyncFD):
keys[i].data.readCBs[].remove(node)
p.selector.unregister(fd)
else:
if Event.User in events or events == {Event.Error}:
for node in keys[i].data.readCBs[].nodes():
let cb = node.value
custom = true
if cb != nil:
if cb(fd.AsyncFD):
keys[i].data.readCBs[].remove(node)
p.selector.unregister(fd)
# because state `data` can be modified in callback we need to update
# descriptor events with currently registered callbacks.
@@ -1496,7 +1500,7 @@ else:
addRead(socket, cb)
return retFuture
when supportedPlatform:
when ioselSupportedPlatform:
proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
## Start watching for timeout expiration, and then call the

View File

@@ -12,11 +12,10 @@ template processTest(t, x: untyped) =
if not x: echo(t & " FAILED\r\n")
when not defined(windows):
import os, posix, osproc, nativesockets, times
import os, posix, nativesockets, times
const supportedPlatform = defined(macosx) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(linux)
when ioselSupportedPlatform:
import osproc
proc socket_notification_test(): bool =
proc create_test_socket(): SocketHandle =
@@ -143,7 +142,7 @@ when not defined(windows):
selector.close()
result = true
when supportedPlatform:
when ioselSupportedPlatform:
proc timer_notification_test(): bool =
var selector = newSelector[int]()
var timer = selector.registerTimer(100, false, 0)
@@ -462,7 +461,7 @@ when not defined(windows):
when hasThreadSupport:
processTest("Multithreaded user event notification test...",
mt_event_test())
when supportedPlatform:
when ioselSupportedPlatform:
processTest("Timer notification test...", timer_notification_test())
processTest("Process notification test...", process_notification_test())
processTest("Signal notification test...", signal_notification_test())

View File

@@ -8,11 +8,12 @@ OK
"""
when defined(upcoming):
import asyncdispatch, times, osproc, streams
import asyncdispatch, times, streams, posix
from ioselectors import ioselSupportedPlatform
const supportedPlatform = defined(linux) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(macosx)
proc delayedSet(ev: AsyncEvent, timeout: int): Future[void] {.async.} =
await sleepAsync(timeout)
ev.setEvent()
proc waitEvent(ev: AsyncEvent, closeEvent = false): Future[void] =
var retFuture = newFuture[void]("waitEvent")
@@ -25,56 +26,55 @@ when defined(upcoming):
addEvent(ev, cb)
return retFuture
proc waitTimer(timeout: int): Future[void] =
var retFuture = newFuture[void]("waitTimer")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addTimer(timeout, true, cb)
return retFuture
proc waitProcess(p: Process): Future[void] =
var retFuture = newFuture[void]("waitProcess")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addProcess(p.processID(), cb)
return retFuture
proc delayedSet(ev: AsyncEvent, timeout: int): Future[void] {.async.} =
await waitTimer(timeout)
ev.setEvent()
proc timerTest() =
waitFor(waitTimer(200))
echo "OK"
proc eventTest() =
var event = newAsyncEvent()
var fut = waitEvent(event)
asyncCheck(delayedSet(event, 500))
waitFor(fut or waitTimer(1000))
waitFor(fut or sleepAsync(1000))
if fut.finished:
echo "OK"
else:
echo "eventTest: Timeout expired before event received!"
proc processTest() =
when defined(windows):
var process = startProcess("ping.exe", "",
["127.0.0.1", "-n", "2", "-w", "100"], nil,
{poStdErrToStdOut, poUsePath, poInteractive,
poDemon})
else:
var process = startProcess("/bin/sleep", "", ["1"], nil,
{poStdErrToStdOut, poUsePath})
var fut = waitProcess(process)
waitFor(fut or waitTimer(2000))
if fut.finished and process.peekExitCode() == 0:
echo "OK"
else:
echo "processTest: Timeout expired before process exited!"
when ioselSupportedPlatform or defined(windows):
when supportedPlatform:
import posix
import osproc
proc waitTimer(timeout: int): Future[void] =
var retFuture = newFuture[void]("waitTimer")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addTimer(timeout, true, cb)
return retFuture
proc waitProcess(p: Process): Future[void] =
var retFuture = newFuture[void]("waitProcess")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addProcess(p.processID(), cb)
return retFuture
proc timerTest() =
waitFor(waitTimer(200))
echo "OK"
proc processTest() =
when defined(windows):
var process = startProcess("ping.exe", "",
["127.0.0.1", "-n", "2", "-w", "100"], nil,
{poStdErrToStdOut, poUsePath, poInteractive,
poDemon})
else:
var process = startProcess("/bin/sleep", "", ["1"], nil,
{poStdErrToStdOut, poUsePath})
var fut = waitProcess(process)
waitFor(fut or waitTimer(2000))
if fut.finished and process.peekExitCode() == 0:
echo "OK"
else:
echo "processTest: Timeout expired before process exited!"
when ioselSupportedPlatform:
proc waitSignal(signal: int): Future[void] =
var retFuture = newFuture[void]("waitSignal")
@@ -97,7 +97,7 @@ when defined(upcoming):
else:
echo "signalTest: Timeout expired before signal received!"
when supportedPlatform:
when ioselSupportedPlatform:
timerTest()
eventTest()
processTest()