tasyncawait now works on Linux.

Reworked detection of a file descriptor being closed with epoll (in the
case of sockets it is when the remote host disconnects). Ensured that
events are only updated when they change.
This commit is contained in:
Dominik Picheta
2014-03-11 21:53:35 +00:00
parent 9b5357da5a
commit 2ce07042fd
3 changed files with 69 additions and 45 deletions

View File

@@ -479,9 +479,11 @@ else:
discard p.selector.update(sock, events)
proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
#echo("addRead")
if sock notin p.selector:
var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[])
p.selector.register(sock, {EvRead}, data.PObject)
#echo("registered")
else:
p.selector[sock].data.PData.readCBs.add(cb)
p.update(sock, p.selector[sock].events + {EvRead})
@@ -498,7 +500,7 @@ else:
for info in p.selector.select(timeout):
let data = PData(info.key.data)
assert data.sock == info.key.fd
#echo("In poll ", data.sock.cint)
if EvRead in info.events:
# Callback may add items to ``data.readCBs`` which causes issues if
# we are iterating over ``data.readCBs`` at the same time. We therefore
@@ -517,11 +519,17 @@ else:
if not cb(data.sock):
# Callback wants to be called again.
data.writeCBs.add(cb)
var newEvents: set[TEvent]
if data.readCBs.len != 0: newEvents = {EvRead}
if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
p.update(data.sock, newEvents)
if info.key in p.selector:
var newEvents: set[TEvent]
if data.readCBs.len != 0: newEvents = {EvRead}
if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
if newEvents != info.key.events:
echo(info.key.events, " -> ", newEvents)
p.update(data.sock, newEvents)
else:
# FD no longer a part of the selector. Likely been closed
# (e.g. socket disconnected).
proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
af = AF_INET): PFuture[int] =
@@ -569,6 +577,7 @@ else:
result = true
let netSize = size - sizeRead
let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint)
#echo("recv cb res: ", res)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
@@ -576,6 +585,7 @@ else:
else:
result = false # We still want this callback to be called.
elif res == 0:
#echo("Disconnected recv: ", sizeRead)
# Disconnected
if sizeRead == 0:
retFuture.complete("")
@@ -588,6 +598,7 @@ else:
result = false # We want to read all the data requested.
else:
retFuture.complete(readBuffer)
#echo("Recv cb result: ", result)
addRead(p, socket, cb)
return retFuture
@@ -833,9 +844,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
result = ""
var c = ""
while true:
#echo("1")
c = await p.recv(socket, 1)
#echo("Received ", c.len)
if c.len == 0:
#echo("returning")
return
#echo("2")
if c == "\r":
c = await p.recv(socket, 1, MSG_PEEK)
if c.len > 0 and c == "\L":
@@ -845,7 +860,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
elif c == "\L":
addNLIfEmpty()
return
#echo("3")
add(result.string, c)
#echo("4")
when isMainModule:
@@ -859,6 +876,7 @@ when isMainModule:
proc main(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "irc.freenode.net", TPort(6667))
while true:
echo("recvLine")
var line = await p.recvLine(sock)
echo("Line is: ", line.repr)
if line == "":
@@ -882,7 +900,7 @@ when isMainModule:
else:
when true:
var f = p.connect(sock, "irc.freenode.org", TPort(6667))
var f = p.connect(sock, "irc.poop.nl", TPort(6667))
f.callback =
proc (future: PFuture[int]) =
echo("Connected in future!")
@@ -898,11 +916,13 @@ when isMainModule:
sock.bindAddr(TPort(6667))
sock.listen()
proc onAccept(future: PFuture[TSocketHandle]) =
echo "Accepted"
var t = p.send(future.read, "test\c\L")
let client = future.read
echo "Accepted ", client.cint
var t = p.send(client, "test\c\L")
t.callback =
proc (future: PFuture[int]) =
echo(future.read)
echo("Send: ", future.read)
client.close()
var f = p.accept(sock)
f.callback = onAccept

View File

@@ -10,7 +10,6 @@
# TODO: Docs.
import tables, os, unsigned, hashes
import sockets2
when defined(linux): import posix, epoll
elif defined(windows): import winlean
@@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc):
result.events = EPOLLIN
if EvWrite in events:
result.events = result.events or EPOLLOUT
result.events = result.events or EPOLLRDHUP
result.data.fd = fd.cint
proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent],
data: PObject): PSelectorKey {.discardable.} =
## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent
## ``events``.
if s.fds.hasKey(fd):
raise newException(EInvalidValue, "File descriptor already exists.")
var event = createEventStruct(events, fd)
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
OSError(OSLastError())
@@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc):
proc update*(s: PSelector, fd: TSocketHandle,
events: set[TEvent]): PSelectorKey {.discardable.} =
## Updates the events which ``fd`` wants notifications for.
if not s.fds.hasKey(fd):
raise newException(EInvalidValue, "File descriptor not found.")
var event = createEventStruct(events, fd)
s.fds[fd].events = events
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
if OSLastError().cint == ENOENT:
# Socket has been closed. Epoll automatically removes disconnected
# sockets.
s.fds.del(fd)
osError("Socket has been disconnected")
OSError(OSLastError())
result = s.fds[fd]
if s.fds[fd].events != events:
echo("Update ", fd.cint, " to ", events)
var event = createEventStruct(events, fd)
s.fds[fd].events = events
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
OSError(OSLastError())
result = s.fds[fd]
proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
if not s.fds.hasKey(fd):
raise newException(EInvalidValue, "File descriptor not found.")
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
if osLastError().cint == ENOENT:
# Socket has been closed. Epoll automatically removes disconnected
# sockets so its already been removed.
else:
OSError(OSLastError())
OSError(OSLastError())
result = s.fds[fd]
s.fds.del(fd)
@@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc):
assert selectorKey != nil
result.add((selectorKey, evSet))
if (s.events[i].events and EPOLLHUP) != 0 or
(s.events[i].events and EPOLLRDHUP) != 0:
# fd closed
#echo("fd closed ", s.events[i].data.fd)
s.unregister(s.events[i].data.fd.TSocketHandle)
#echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events)
proc newSelector*(): PSelector =
new result
result.epollFD = epoll_create(64)
@@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc):
proc contains*(s: PSelector, fd: TSocketHandle): bool =
## Determines whether selector contains a file descriptor.
return s.fds.hasKey(fd)
if s.fds.hasKey(fd):
result = true
# Ensure the underlying epoll instance still contains this fd.
var event = createEventStruct(s.fds[fd].events, fd)
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
let err = osLastError()
if err.cint in {ENOENT, EBADF}:
return false
OSError(OSLastError())
else:
return false
proc contains*(s: PSelector, key: PSelectorKey): bool =
## Determines whether selector contains this selector key. More accurate
## than checking if the file descriptor is in the selector because it
## ensures that the keys are equal. File descriptors may not always be
## unique especially when an fd is closed and then a new one is opened,
## the new one may have the same value.
return key.fd in s and s.fds[key.fd] == key
proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey =
## Retrieves the selector key for ``fd``.

View File

@@ -15,24 +15,17 @@ const
var clientCount = 0
proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} =
echo("entering sendMessages")
for i in 0 .. <messagesToSend:
discard await disp.send(client, "Message " & $i & "\c\L")
echo("returning sendMessages")
proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
for i in 0 .. <swarmSize:
var sock = socket()
# TODO: We may need to explicitly register and unregister the fd.
# This is because when the socket is closed, selectors is not aware
# that it has been closed. While epoll is. Perhaps we should just unregister
# in close()?
echo(sock.cint)
#disp.register(sock)
discard await disp.connect(sock, "localhost", port)
when true:
discard await sendMessages(disp, sock)
echo("Calling close")
sock.close()
else:
# Issue #932: https://github.com/Araq/Nimrod/issues/932