Asyncio module now works on file descriptors.

This commit is contained in:
Dominik Picheta
2012-09-02 19:58:13 +01:00
parent b56df72a32
commit d127ad5076
2 changed files with 150 additions and 43 deletions

View File

@@ -70,22 +70,24 @@ import sockets, os
## the socket has established a connection to a server socket; from that point
## it can be safely written to.
when defined(windows):
from winlean import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select
else:
from posix import TTimeVal, TFdSet, FD_ZERO, FD_SET, FD_ISSET, select
type
TDelegate = object
fd: cint
deleVal*: PObject
handleRead*: proc (h: PObject) {.nimcall.}
handleWrite*: proc (h: PObject) {.nimcall.}
handleConnect*: proc (h: PObject) {.nimcall.}
handleAccept*: proc (h: PObject) {.nimcall.}
getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket] {.nimcall.}
handleError*: proc (h: PObject) {.nimcall.}
hasDataBuffered*: proc (h: PObject): bool {.nimcall.}
open*: bool
task*: proc (h: PObject) {.nimcall.}
mode*: TMode
mode*: TFileMode
PDelegate* = ref TDelegate
@@ -106,24 +108,20 @@ type
lineBuffer: TaintedString ## Temporary storage for ``recvLine``
sslNeedAccept: bool
proto: TProtocol
deleg: PDelegate
TInfo* = enum
TInfo = enum
SockIdle, SockConnecting, SockConnected, SockListening, SockClosed, SockUDPBound
TMode* = enum
MReadable, MWriteable, MReadWrite
proc newDelegate*(): PDelegate =
## Creates a new delegate.
new(result)
result.handleRead = (proc (h: PObject) = nil)
result.handleWrite = (proc (h: PObject) = nil)
result.handleConnect = (proc (h: PObject) = nil)
result.handleAccept = (proc (h: PObject) = nil)
result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
doAssert(false))
result.handleError = (proc (h: PObject) = nil)
result.hasDataBuffered = (proc (h: PObject): bool = return false)
result.task = (proc (h: PObject) = nil)
result.mode = MReadable
result.mode = fmRead
proc newAsyncSocket(): PAsyncSocket =
new(result)
@@ -144,21 +142,28 @@ proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
if result.socket == InvalidSocket: OSError()
result.socket.setBlocking(false)
proc asyncSockHandleConnect(h: PObject) =
when defined(ssl):
if PAsyncSocket(h).socket.isSSL and not
PAsyncSocket(h).socket.gotHandshake:
return
PAsyncSocket(h).info = SockConnected
PAsyncSocket(h).handleConnect(PAsyncSocket(h))
proc asyncSockHandleRead(h: PObject) =
when defined(ssl):
if PAsyncSocket(h).socket.isSSL and not
PAsyncSocket(h).socket.gotHandshake:
return
PAsyncSocket(h).handleRead(PAsyncSocket(h))
if PAsyncSocket(h).info != SockListening:
assert PAsyncSocket(h).info != SockConnecting
PAsyncSocket(h).handleRead(PAsyncSocket(h))
else:
PAsyncSocket(h).handleAccept(PAsyncSocket(h))
proc asyncSockHandleWrite(h: PObject) =
when defined(ssl):
if PAsyncSocket(h).socket.isSSL and not
PAsyncSocket(h).socket.gotHandshake:
return
if PAsyncSocket(h).info == SockConnecting:
PAsyncSocket(h).handleConnect(PAsyncSocket(h))
# Stop receiving write events
PAsyncSocket(h).deleg.mode = fmRead
when defined(ssl):
proc asyncSockDoHandshake(h: PObject) =
@@ -173,19 +178,27 @@ when defined(ssl):
else:
# handshake will set socket's ``sslNoHandshake`` field.
discard PAsyncSocket(h).socket.handshake()
proc toDelegate(sock: PAsyncSocket): PDelegate =
result = newDelegate()
result.deleVal = sock
result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
return (PAsyncSocket(h).info, PAsyncSocket(h).socket))
result.handleConnect = asyncSockHandleConnect
result.fd = getFD(sock.socket)
# We need this to get write events, just to know when the socket connects.
result.mode = fmReadWrite
result.handleRead = asyncSockHandleRead
result.handleAccept = (proc (h: PObject) =
PAsyncSocket(h).handleAccept(PAsyncSocket(h)))
result.handleWrite = asyncSockHandleWrite
# TODO: Errors?
#result.handleError = (proc (h: PObject) = assert(false))
result.hasDataBuffered =
proc (h: PObject): bool {.nimcall.} =
return PAsyncSocket(h).socket.hasDataBuffered()
sock.deleg = result
if sock.info notin {SockIdle, SockClosed}:
sock.deleg.open = true
else:
sock.deleg.open = false
when defined(ssl):
result.task = asyncSockDoHandshake
@@ -195,22 +208,26 @@ proc connect*(sock: PAsyncSocket, name: string, port = TPort(0),
## Begins connecting ``sock`` to ``name``:``port``.
sock.socket.connectAsync(name, port, af)
sock.info = SockConnecting
sock.deleg.open = true
proc close*(sock: PAsyncSocket) =
## Closes ``sock``. Terminates any current connections.
sock.info = SockClosed
sock.socket.close()
sock.info = SockClosed
sock.deleg.open = false
proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
## Equivalent to ``sockets.bindAddr``.
sock.socket.bindAddr(port, address)
if sock.proto == IPPROTO_UDP:
sock.info = SockUDPBound
sock.deleg.open = true
proc listen*(sock: PAsyncSocket) =
## Equivalent to ``sockets.listen``.
sock.socket.listen()
sock.info = SockListening
sock.deleg.open = true
proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
address: var string) =
@@ -245,8 +262,11 @@ proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
if c == InvalidSocket: OSError()
c.setBlocking(false) # TODO: Needs to be tested.
# deleg.open is set in ``toDelegate``.
client.socket = c
client.lineBuffer = ""
client.info = SockConnected
proc accept*(server: PAsyncSocket, client: var PAsyncSocket) =
## Equivalent to ``sockets.accept``.
@@ -297,9 +317,6 @@ proc isWriteable*(s: PAsyncSocket): bool =
var writeSock = @[s.socket]
return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
proc `userArg=`*(s: PAsyncSocket, val: PObject) =
s.userArg = val
converter getSocket*(s: PAsyncSocket): TSocket =
return s.socket
@@ -338,6 +355,48 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool =
of RecvFail:
result = false
proc timeValFromMilliseconds(timeout = 500): TTimeVal =
if timeout != -1:
var seconds = timeout div 1000
result.tv_sec = seconds.int32
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
proc createFdSet(fd: var TFdSet, s: seq[PDelegate], m: var int) =
FD_ZERO(fd)
for i in items(s):
m = max(m, int(i.fd))
FD_SET(i.fd, fd)
proc pruneSocketSet(s: var seq[PDelegate], fd: var TFdSet) =
var i = 0
var L = s.len
while i < L:
if FD_ISSET(s[i].fd, fd) != 0'i32:
s[i] = s[L-1]
dec(L)
else:
inc(i)
setLen(s, L)
proc select(readfds, writefds, exceptfds: var seq[PDelegate],
timeout = 500): int =
var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout)
var rd, wr, ex: TFdSet
var m = 0
createFdSet(rd, readfds, m)
createFdSet(wr, writefds, m)
createFdSet(ex, exceptfds, m)
if timeout != -1:
result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv)))
else:
result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil))
pruneSocketSet(readfds, (rd))
pruneSocketSet(writefds, (wr))
pruneSocketSet(exceptfds, (ex))
proc poll*(d: PDispatcher, timeout: int = 500): bool =
## This function checks for events on all the sockets in the `PDispatcher`.
## It then proceeds to call the correct event handler.
@@ -354,8 +413,47 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool =
## **Note:** Each delegate has a task associated with it. This gets called
## after each select() call, if you make timeout ``-1`` the tasks will
## only be executed after one or more sockets becomes readable or writeable.
result = true
var readDg, writeDg, errorDg: seq[PDelegate] = @[]
var len = d.delegates.len
var dc = 0
while dc < len:
let deleg = d.delegates[dc]
if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open:
readDg.add(deleg)
if (deleg.mode != fmRead) and deleg.open:
writeDg.add(deleg)
if deleg.open:
errorDg.add(deleg)
inc dc
else:
# File/socket has been closed. Remove it from dispatcher.
d.delegates[dc] = d.delegates[len-1]
dec len
d.delegates.setLen(len)
if readDg.len() == 0 and writeDg.len() == 0:
## TODO: Perhaps this shouldn't return if errorDg has something?
return False
# TODO: Buffering hasDataBuffered!!
if select(readDg, writeDg, errorDg, timeout) != 0:
for i in 0..len(d.delegates)-1:
if i > len(d.delegates)-1: break # One delegate might've been removed.
let deleg = d.delegates[i]
if (deleg.mode != fmWrite or deleg.mode != fmAppend) and
deleg notin readDg:
deleg.handleRead(deleg.deleVal)
if (deleg.mode != fmRead) and deleg notin writeDg:
deleg.handleWrite(deleg.deleVal)
if deleg notin errorDg:
deleg.handleError(deleg.deleVal)
# Execute tasks
for i in items(d.delegates):
i.task(i.deleVal)
discard """result = true
var readSocks, writeSocks: seq[TSocket] = @[]
var L = d.delegates.len
@@ -410,7 +508,7 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool =
# Execute tasks
for i in items(d.delegates):
i.task(i.deleVal)
i.task(i.deleVal)"""
proc len*(disp: PDispatcher): int =
## Retrieves the amount of delegates in ``disp``.

View File

@@ -711,7 +711,7 @@ proc connectAsync*(socket: TSocket, name: string, port = TPort(0),
## A variant of ``connect`` for non-blocking sockets.
##
## This procedure will immediatelly return, it will not block until a connection
## is made. It is up to the caller to make sure the connections has been established
## is made. It is up to the caller to make sure the connection has been established
## by checking (using ``select``) whether the socket is writeable.
##
## **Note**: For SSL sockets, the ``handshake`` procedure must be called
@@ -820,6 +820,12 @@ proc pruneSocketSet(s: var seq[TSocket], fd: var TFdSet) =
inc(i)
setLen(s, L)
proc hasDataBuffered*(s: TSocket): bool =
## Determines whether a socket has data buffered.
result = false
if s.isBuffered:
result = s.bufLen > 0 and s.currPos != s.bufLen
proc checkBuffer(readfds: var seq[TSocket]): int =
## Checks the buffer of each socket in ``readfds`` to see whether there is data.
## Removes the sockets from ``readfds`` and returns the count of removed sockets.
@@ -1385,6 +1391,9 @@ proc connect*(socket: TSocket, timeout: int, name: string, port = TPort(0),
proc isSSL*(socket: TSocket): bool = return socket.isSSL
## Determines whether ``socket`` is a SSL socket.
proc getFD*(socket: TSocket): cint = return socket.fd
## Returns the socket's file descriptor
when defined(Windows):
var wsa: TWSADATA
if WSAStartup(0x0101'i16, wsa) != 0: OSError()