Fixes to asyncio2 on Linux.

This commit is contained in:
Dominik Picheta
2014-03-09 13:49:38 +00:00
parent 7704cdc90e
commit a097526956
5 changed files with 64 additions and 24 deletions

View File

@@ -36,7 +36,7 @@ type
epoll_data* {.importc: "union epoll_data",
header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union.
#thePtr* {.importc: "ptr".}: pointer
fd*: cint # \
fd* {.importc: "fd".}: cint # \
#u32*: uint32
#u64*: uint64

View File

@@ -473,7 +473,6 @@ else:
proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
assert sock in p.selector
echo("Update: ", events)
if events == {}:
discard p.selector.unregister(sock)
else:
@@ -499,23 +498,25 @@ else:
for info in p.selector.select(timeout):
let data = PData(info.key.data)
assert data.sock == info.key.fd
echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events)
if EvRead in info.events:
var newReadCBs: seq[TCallback] = @[]
for cb in data.readCBs:
# Callback may add items to ``data.readCBs`` which causes issues if
# we are iterating over ``data.readCBs`` at the same time. We therefore
# make a copy to iterate over.
let currentCBs = data.readCBs
data.readCBs = @[]
for cb in currentCBs:
if not cb(data.sock):
# Callback wants to be called again.
newReadCBs.add(cb)
data.readCBs = newReadCBs
data.readCBs.add(cb)
if EvWrite in info.events:
var newWriteCBs: seq[TCallback] = @[]
for cb in data.writeCBs:
let currentCBs = data.writeCBs
data.writeCBs = @[]
for cb in currentCBs:
if not cb(data.sock):
# Callback wants to be called again.
newWriteCBs.add(cb)
data.writeCBs = newWriteCBs
data.writeCBs.add(cb)
var newEvents: set[TEvent]
if data.readCBs.len != 0: newEvents = {EvRead}
@@ -615,7 +616,6 @@ else:
retFuture.complete(0)
addWrite(p, socket, cb)
return retFuture
proc acceptAddr*(p: PDispatcher, socket: TSocketHandle):
PFuture[tuple[address: string, client: TSocketHandle]] =
@@ -854,7 +854,7 @@ when isMainModule:
sock.setBlocking false
when false:
when true:
# Await tests
proc main(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "irc.freenode.net", TPort(6667))
@@ -880,7 +880,7 @@ when isMainModule:
else:
when false:
when true:
var f = p.connect(sock, "irc.freenode.org", TPort(6667))
f.callback =
@@ -919,4 +919,4 @@ when isMainModule:

View File

@@ -10,11 +10,13 @@
# TODO: Docs.
import tables, os, unsigned, hashes
import sockets2
when defined(linux): import posix, epoll
elif defined(windows): import winlean
proc hash*(x: TSocketHandle): THash {.borrow.}
proc `$`*(x: TSocketHandle): string {.borrow.}
type
TEvent* = enum
@@ -31,7 +33,7 @@ when defined(linux) or defined(nimdoc):
type
PSelector* = ref object
epollFD: cint
events: array[64, ptr epoll_event]
events: array[64, epoll_event]
fds: TTable[TSocketHandle, PSelectorKey]
proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event =
@@ -66,17 +68,25 @@ when defined(linux) or defined(nimdoc):
var event = createEventStruct(events, fd)
s.fds[fd].events = events
echo("About to update")
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
if OSLastError().cint == ENOENT:
# Socket has been closed. Epoll automatically removes disconnected
# sockets.
s.fds.del(fd)
osError("Socket has been disconnected")
OSError(OSLastError())
echo("finished updating")
result = s.fds[fd]
proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
if not s.fds.hasKey(fd):
raise newException(EInvalidValue, "File descriptor not found.")
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
OSError(OSLastError())
if osLastError().cint == ENOENT:
# Socket has been closed. Epoll automatically removes disconnected
# sockets so its already been removed.
else:
OSError(OSLastError())
result = s.fds[fd]
s.fds.del(fd)
@@ -92,21 +102,21 @@ when defined(linux) or defined(nimdoc):
## on the ``fd``.
result = @[]
let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint)
let evNum = epoll_wait(s.epollFD, addr s.events[0], 64.cint, timeout.cint)
if evNum < 0: OSError(OSLastError())
if evNum == 0: return @[]
for i in 0 .. <evNum:
var evSet: set[TEvent] = {}
if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
if (s.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
let selectorKey = s.fds[s.events[i].data.fd.TSocketHandle]
assert selectorKey != nil
result.add((selectorKey, evSet))
proc newSelector*(): PSelector =
new result
result.epollFD = epoll_create(64)
result.events = cast[array[64, ptr epoll_event]](alloc0(sizeof(epoll_event)*64))
result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
result.fds = initTable[TSocketHandle, PSelectorKey]()
if result.epollFD < 0:
OSError(OSLastError())
@@ -247,4 +257,4 @@ when isMainModule:

View File

@@ -24,6 +24,10 @@ else:
export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen,
inet_ntoa, recv, `==`, connect, send, accept
export
SO_ERROR,
SOL_SOCKET
type
TPort* = distinct uint16 ## port type
@@ -208,6 +212,24 @@ proc htons*(x: int16): int16 =
## order, this is a no-op; otherwise, it performs a 2-byte swap operation.
result = sockets2.ntohs(x)
proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {.
tags: [FReadIO].} =
## getsockopt for integer options.
var res: cint
var size = sizeof(res).TSocklen
if getsockopt(socket, cint(level), cint(optname),
addr(res), addr(size)) < 0'i32:
osError(osLastError())
result = int(res)
proc setSockOptInt*(socket: TSocketHandle, level, optname, optval: int) {.
tags: [FWriteIO].} =
## setsockopt for integer options.
var value = cint(optval)
if setsockopt(socket, cint(level), cint(optname), addr(value),
sizeof(value).TSocklen) < 0'i32:
osError(osLastError())
when defined(Windows):
var wsa: TWSADATA
if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError())

View File

@@ -15,16 +15,24 @@ const
var clientCount = 0
proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} =
echo("entering sendMessages")
for i in 0 .. <messagesToSend:
discard await disp.send(client, "Message " & $i & "\c\L")
discard await disp.send(client, "Message " & $i & "\c\L")
echo("returning sendMessages")
proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
for i in 0 .. <swarmSize:
var sock = socket()
# TODO: We may need to explicitly register and unregister the fd.
# This is because when the socket is closed, selectors is not aware
# that it has been closed. While epoll is. Perhaps we should just unregister
# in close()?
echo(sock.cint)
#disp.register(sock)
discard await disp.connect(sock, "localhost", port)
when true:
discard await sendMessages(disp, sock)
echo("Calling close")
sock.close()
else:
# Issue #932: https://github.com/Araq/Nimrod/issues/932