Merge pull request #4725 from cheatfate/fixtest_upcoming

upcoming_async: Fix multiple issues and add test.
This commit is contained in:
Andreas Rumpf
2016-09-06 13:48:50 +02:00
committed by GitHub
2 changed files with 159 additions and 25 deletions

View File

@@ -1163,7 +1163,7 @@ when defined(windows) or defined(nimdoc):
## receiving notifies.
registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE)
template registerWaitableHandle(p, hEvent, flags, pcd, handleCallback) =
template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) =
let handleFD = AsyncFD(hEvent)
pcd.ioPort = p.ioPort
pcd.handleFd = handleFD
@@ -1177,10 +1177,10 @@ when defined(windows) or defined(nimdoc):
pcd.ovl = ol
if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
cast[WAITORTIMERCALLBACK](waitableCallback),
cast[pointer](pcd), INFINITE, flags):
cast[pointer](pcd), timeout.Dword, flags):
GC_unref(ol)
deallocShared(cast[pointer](pcd))
discard wsaCloseEvent(hEvent)
discard closeHandle(hEvent)
raiseOSError(osLastError())
p.handles.incl(handleFD)
@@ -1212,7 +1212,7 @@ when defined(windows) or defined(nimdoc):
deallocShared(cast[pointer](pcd))
p.handles.excl(fd)
registerWaitableHandle(p, hEvent, flags, pcd, timercb)
registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
proc addProcess*(pid: int, cb: Callback) =
## Registers callback ``cb`` to be called when process with pid ``pid``
@@ -1236,10 +1236,12 @@ when defined(windows) or defined(nimdoc):
p.handles.excl(fd)
discard cb(fd)
registerWaitableHandle(p, hProcess, flags, pcd, proccb)
registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
proc newAsyncEvent*(): AsyncEvent =
## Creates new ``AsyncEvent`` object.
## New ``AsyncEvent`` object is not automatically registered with
## dispatcher like ``AsyncSocket``.
var sa = SECURITY_ATTRIBUTES(
nLength: sizeof(SECURITY_ATTRIBUTES).cint,
bInheritHandle: 1
@@ -1248,14 +1250,15 @@ when defined(windows) or defined(nimdoc):
if event == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
result.hEvent = event
proc setEvent*(ev: AsyncEvent) =
## Set event ``ev`` to signaled state.
if setEvent(ev.hEvent) == 0:
raiseOSError(osLastError())
proc close*(ev: AsyncEvent) =
## Closes event ``ev``.
proc unregister*(ev: AsyncEvent) =
## Unregisters event ``ev``.
if ev.hWaiter != 0:
let p = getGlobalDispatcher()
if unregisterWait(ev.hWaiter) == 0:
@@ -1263,7 +1266,12 @@ when defined(windows) or defined(nimdoc):
if err.int32 != ERROR_IO_PENDING:
raiseOSError(osLastError())
p.handles.excl(AsyncFD(ev.hEvent))
ev.hWaiter = 0
else:
raise newException(ValueError, "Event is not registered!")
proc close*(ev: AsyncEvent) =
## Closes event ``ev``.
if closeHandle(ev.hEvent) == 0:
raiseOSError(osLastError())
deallocShared(cast[pointer](ev))
@@ -1281,15 +1289,12 @@ when defined(windows) or defined(nimdoc):
proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if cb(fd):
if unregisterWait(pcd.waitFd) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(osLastError())
ev.hWaiter = 0
# we need this check to avoid exception, if `unregister(event)` was
# called in callback.
if ev.hWaiter != 0: unregister(ev)
deallocShared(cast[pointer](pcd))
p.handles.excl(fd)
registerWaitableHandle(p, hEvent, flags, pcd, eventcb)
registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
ev.hWaiter = pcd.waitFd
initAll()
@@ -1319,7 +1324,7 @@ else:
readCB: Callback
writeCB: Callback
AsyncEvent* = SelectEvent
AsyncEvent* = distinct SelectEvent
PDispatcher* = ref object of PDispatcherBase
selector: Selector[AsyncData]
@@ -1368,8 +1373,8 @@ else:
proc unregister*(fd: AsyncFD) =
getGlobalDispatcher().selector.unregister(fd.SocketHandle)
# proc unregister*(ev: AsyncEvent) =
# getGlobalDispatcher().selector.unregister(SelectEvent(ev))
proc unregister*(ev: AsyncEvent) =
getGlobalDispatcher().selector.unregister(SelectEvent(ev))
proc addRead*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()
@@ -1409,7 +1414,7 @@ else:
var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
var i = 0
while i < count:
var update = false
var custom = false
var fd = keys[i].fd.SocketHandle
let events = keys[i].events
@@ -1420,7 +1425,6 @@ else:
p.selector.withData(fd, adata) do:
if adata.readCB == cb:
adata.readCB = nil
update = true
if Event.Write in events:
let cb = keys[i].data.writeCB
@@ -1429,24 +1433,29 @@ else:
p.selector.withData(fd, adata) do:
if adata.writeCB == cb:
adata.writeCB = nil
update = true
when supportedPlatform:
if (customSet * events) != {}:
let cb = keys[i].data.readCB
doAssert(cb != nil)
custom = true
if cb(fd.AsyncFD):
p.selector.withData(fd, adata) do:
if adata.readCB == cb:
adata.readCB = nil
p.selector.unregister(fd)
if update:
# because state `data` can be modified in callback we need to update
# descriptor events with currently registered callbacks.
if not custom:
var update = false
var newEvents: set[Event] = {}
p.selector.withData(fd, adata) do:
if adata.readCB != nil: incl(newEvents, Event.Read)
if adata.writeCB != nil: incl(newEvents, Event.Write)
p.selector.updateHandle(fd, newEvents)
update = true
if update:
p.selector.updateHandle(fd, newEvents)
inc(i)
# Timer processing.
@@ -1693,15 +1702,15 @@ else:
proc newAsyncEvent*(): AsyncEvent =
## Creates new ``AsyncEvent``.
result = AsyncEvent(ioselectors.newSelectEvent())
result = AsyncEvent(newSelectEvent())
proc setEvent*(ev: AsyncEvent) =
## Sets new ``AsyncEvent`` to signaled state.
ioselectors.setEvent(SelectEvent(ev))
setEvent(SelectEvent(ev))
proc close*(ev: AsyncEvent) =
## Closes ``AsyncEvent``
ioselectors.close(SelectEvent(ev))
close(SelectEvent(ev))
proc addEvent*(ev: AsyncEvent, cb: Callback) =
## Start watching for event ``ev``, and call callback ``cb``, when

View File

@@ -0,0 +1,125 @@
discard """
output: '''
OK
OK
OK
OK
'''
"""
when defined(upcoming):
import asyncdispatch, times, osproc, streams
const supportedPlatform = defined(linux) or defined(freebsd) or
defined(netbsd) or defined(openbsd) or
defined(macosx)
proc waitEvent(ev: AsyncEvent, closeEvent = false): Future[void] =
var retFuture = newFuture[void]("waitEvent")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
if closeEvent:
return true
else:
return false
addEvent(ev, cb)
return retFuture
proc waitTimer(timeout: int): Future[void] =
var retFuture = newFuture[void]("waitTimer")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addTimer(timeout, true, cb)
return retFuture
proc waitProcess(p: Process): Future[void] =
var retFuture = newFuture[void]("waitProcess")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addProcess(p.processID(), cb)
return retFuture
proc delayedSet(ev: AsyncEvent, timeout: int): Future[void] {.async.} =
await waitTimer(timeout)
ev.setEvent()
proc timerTest() =
var timeout = 200
var errorRate = 10.0
var start = epochTime()
waitFor(waitTimer(200))
var finish = epochTime()
var lowlimit = float(timeout) - float(timeout) * errorRate / 100.0
var highlimit = float(timeout) + float(timeout) * errorRate / 100.0
var elapsed = (finish - start) * 1_000 # convert to milliseconds
if elapsed >= lowlimit and elapsed < highlimit:
echo "OK"
else:
echo "timerTest: Timeout = " & $(elapsed) & ", but must be inside of [" &
$lowlimit & ", " & $highlimit & ")"
proc eventTest() =
var event = newAsyncEvent()
var fut = waitEvent(event)
asyncCheck(delayedSet(event, 500))
waitFor(fut or waitTimer(1000))
if fut.finished:
echo "OK"
else:
echo "eventTest: Timeout expired before event received!"
proc processTest() =
when defined(windows):
var process = startProcess("ping.exe", "",
["127.0.0.1", "-n", "2", "-w", "100"], nil,
{poStdErrToStdOut, poUsePath, poInteractive,
poDemon})
else:
var process = startProcess("/bin/sleep", "", ["1"], nil,
{poStdErrToStdOut, poUsePath})
var fut = waitProcess(process)
waitFor(fut or waitTimer(2000))
if fut.finished and process.peekExitCode() == 0:
echo "OK"
else:
echo "processTest: Timeout expired before process exited!"
when supportedPlatform:
import posix
proc waitSignal(signal: int): Future[void] =
var retFuture = newFuture[void]("waitSignal")
proc cb(fd: AsyncFD): bool =
retFuture.complete()
addSignal(signal, cb)
return retFuture
proc delayedSignal(signal: int, timeout: int): Future[void] {.async.} =
await waitTimer(timeout)
var pid = posix.getpid()
discard posix.kill(pid, signal.cint)
proc signalTest() =
var fut = waitSignal(posix.SIGINT)
asyncCheck(delayedSignal(posix.SIGINT, 500))
waitFor(fut or waitTimer(1000))
if fut.finished:
echo "OK"
else:
echo "signalTest: Timeout expired before signal received!"
when supportedPlatform:
timerTest()
eventTest()
processTest()
signalTest()
elif defined(windows):
timerTest()
eventTest()
processTest()
echo "OK"
else:
eventTest()
echo "OK\nOK\nOK"
else:
echo "OK\nOK\nOK\nOK"