Files
Nim/lib/pure/asyncio.nim

355 lines
12 KiB
Nim

#
#
# Nimrod's Runtime Library
# (c) Copyright 2012 Andreas Rumpf, Dominik Picheta
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
import sockets, os
## This module implements an asynchronous event loop for sockets.
## It is akin to Python's asyncore module. Many modules that use sockets
## have an implementation for this module, those modules should all have a
## ``register`` function which you should use to add it to a dispatcher so
## that you can receive the events associated with that module.
##
## Once everything is registered in a dispatcher, you need to call the ``poll``
## function in a while loop.
##
## **Note:** Most modules have tasks which need to be ran regularly, this is
## why you should not call ``poll`` with a infinite timeout, or even a
## very long one. In most cases the default timeout is fine.
##
## **Note:** This module currently only supports select(), this is limited by
## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024
## sockets at a time.
##
## Most (if not all) modules that use asyncio provide a userArg which is passed
## on with the events. The type that you set userArg to must be inheriting from
## TObject!
type
TDelegate = object
deleVal*: PObject
handleRead*: proc (h: PObject)
handleWrite*: proc (h: PObject)
handleConnect*: proc (h: PObject)
handleAccept*: proc (h: PObject)
getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket]
task*: proc (h: PObject)
mode*: TMode
PDelegate* = ref TDelegate
PDispatcher* = ref TDispatcher
TDispatcher = object
delegates: seq[PDelegate]
PAsyncSocket* = ref TAsyncSocket
TAsyncSocket = object of TObject
socket: TSocket
info: TInfo
userArg: PObject
handleRead*: proc (s: PAsyncSocket, arg: PObject)
handleConnect*: proc (s: PAsyncSocket, arg: PObject)
handleAccept*: proc (s: PAsyncSocket, arg: PObject)
lineBuffer: TaintedString ## Temporary storage for ``recvLine``
TInfo* = enum
SockIdle, SockConnecting, SockConnected, SockListening, SockClosed
TMode* = enum
MReadable, MWriteable, MReadWrite
proc newDelegate*(): PDelegate =
## Creates a new delegate.
new(result)
result.handleRead = (proc (h: PObject) = nil)
result.handleWrite = (proc (h: PObject) = nil)
result.handleConnect = (proc (h: PObject) = nil)
result.handleAccept = (proc (h: PObject) = nil)
result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
doAssert(false))
result.task = (proc (h: PObject) = nil)
result.mode = MReadable
proc newAsyncSocket(userArg: PObject = nil): PAsyncSocket =
new(result)
result.info = SockIdle
result.userArg = userArg
result.handleRead = (proc (s: PAsyncSocket, arg: PObject) = nil)
result.handleConnect = (proc (s: PAsyncSocket, arg: PObject) = nil)
result.handleAccept = (proc (s: PAsyncSocket, arg: PObject) = nil)
result.lineBuffer = "".TaintedString
proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP,
userArg: PObject = nil): PAsyncSocket =
result = newAsyncSocket(userArg)
result.socket = socket(domain, typ, protocol)
if result.socket == InvalidSocket: OSError()
result.socket.setBlocking(false)
proc toDelegate(sock: PAsyncSocket): PDelegate =
result = newDelegate()
result.deleVal = sock
result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
return (PAsyncSocket(h).info, PAsyncSocket(h).socket))
result.handleConnect = (proc (h: PObject) =
PAsyncSocket(h).info = SockConnected
PAsyncSocket(h).handleConnect(PAsyncSocket(h),
PAsyncSocket(h).userArg))
result.handleRead = (proc (h: PObject) =
PAsyncSocket(h).handleRead(PAsyncSocket(h),
PAsyncSocket(h).userArg))
result.handleAccept = (proc (h: PObject) =
PAsyncSocket(h).handleAccept(PAsyncSocket(h),
PAsyncSocket(h).userArg))
proc connect*(sock: PAsyncSocket, name: string, port = TPort(0),
af: TDomain = AF_INET) =
## Begins connecting ``sock`` to ``name``:``port``.
sock.socket.connectAsync(name, port, af)
sock.info = SockConnecting
proc close*(sock: PAsyncSocket) =
## Closes ``sock``. Terminates any current connections.
sock.info = SockClosed
sock.socket.close()
proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
## Equivalent to ``sockets.bindAddr``.
sock.socket.bindAddr(port, address)
proc listen*(sock: PAsyncSocket) =
## Equivalent to ``sockets.listen``.
sock.socket.listen()
sock.info = SockListening
proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket,
address: string] =
## Equivalent to ``sockets.acceptAddr``.
var (client, a) = server.socket.acceptAddr()
if client == InvalidSocket: OSError()
client.setBlocking(false) # TODO: Needs to be tested.
var aSock: PAsyncSocket = newAsyncSocket()
aSock.socket = client
aSock.info = SockConnected
return (aSock, a)
proc accept*(server: PAsyncSocket): PAsyncSocket =
## Equivalent to ``sockets.accept``.
var (client, a) = server.acceptAddr()
return client
proc newDispatcher*(): PDispatcher =
new(result)
result.delegates = @[]
proc register*(d: PDispatcher, deleg: PDelegate) =
## Registers delegate ``deleg`` with dispatcher ``d``.
d.delegates.add(deleg)
proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} =
## Registers async socket ``sock`` with dispatcher ``d``.
result = sock.toDelegate()
d.register(result)
proc unregister*(d: PDispatcher, deleg: PDelegate) =
## Unregisters deleg ``deleg`` from dispatcher ``d``.
for i in 0..len(d.delegates)-1:
if d.delegates[i] == deleg:
d.delegates.del(i)
return
raise newException(EInvalidIndex, "Could not find delegate.")
proc isWriteable*(s: PAsyncSocket): bool =
## Determines whether socket ``s`` is ready to be written to.
var writeSock = @[s.socket]
return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
proc `userArg=`*(s: PAsyncSocket, val: PObject) =
s.userArg = val
converter getSocket*(s: PAsyncSocket): TSocket =
return s.socket
proc isConnected*(s: PAsyncSocket): bool =
## Determines whether ``s`` is connected.
return s.info == SockConnected
proc isListening*(s: PAsyncSocket): bool =
## Determines whether ``s`` is listening for incoming connections.
return s.info == SockListening
proc isConnecting*(s: PAsyncSocket): bool =
## Determines whether ``s`` is connecting.
return s.info == SockConnecting
proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool =
## Behaves similar to ``sockets.recvLine``, however it handles non-blocking
## sockets properly. This function guarantees that ``line`` is a full line,
## if this function can only retrieve some data; it will save this data and
## add it to the result when a full line is retrieved.
setLen(line.string, 0)
var dataReceived = "".TaintedString
var ret = s.socket.recvLineAsync(dataReceived)
case ret
of RecvFullLine:
if s.lineBuffer.len > 0:
string(line).add(s.lineBuffer.string)
setLen(s.lineBuffer.string, 0)
string(line).add(dataReceived.string)
result = true
of RecvPartialLine:
string(s.lineBuffer).add(dataReceived.string)
result = false
of RecvDisconnected:
result = true
of RecvFail:
result = false
proc poll*(d: PDispatcher, timeout: int = 500): bool =
## This function checks for events on all the sockets in the `PDispatcher`.
## It then proceeds to call the correct event handler.
##
## **Note:** There is no event which signifes when you have been disconnected,
## it is your job to check whether what you get from ``recv`` is ``""``.
## If you have been disconnected, `d`'s ``getSocket`` function should report
## this appropriately.
##
## This function returns ``True`` if there are sockets that are still
## connected (or connecting), otherwise ``False``. Sockets that have been
## closed are immediately removed from the dispatcher automatically.
##
## **Note:** Each delegate has a task associated with it. This gets called
## after each select() call, if you make timeout ``-1`` the tasks will
## only be executed after one or more sockets becomes readable or writeable.
result = true
var readSocks, writeSocks: seq[TSocket] = @[]
var L = d.delegates.len
var dc = 0
while dc < L:
template deleg: expr = d.delegates[dc]
let aSock = deleg.getSocket(deleg.deleVal)
if (deleg.mode != MWriteable and aSock.info == SockConnected) or
aSock.info == SockListening:
readSocks.add(aSock.sock)
if aSock.info == SockConnecting or
(aSock.info == SockConnected and deleg.mode != MReadable):
writeSocks.add(aSock.sock)
if aSock.info == SockClosed:
# Socket has been closed remove it from the dispatcher.
d.delegates[dc] = d.delegates[L-1]
dec L
else: inc dc
d.delegates.setLen(L)
if readSocks.len() == 0 and writeSocks.len() == 0:
return False
if select(readSocks, writeSocks, timeout) != 0:
for i in 0..len(d.delegates)-1:
if i > len(d.delegates)-1: break # One delegate might've been removed.
let deleg = d.delegates[i]
let sock = deleg.getSocket(deleg.deleVal)
if sock.info == SockConnected:
if deleg.mode != MWriteable and sock.sock notin readSocks:
if not (sock.info == SockConnecting):
assert(not (sock.info == SockListening))
deleg.handleRead(deleg.deleVal)
else:
assert(false)
if deleg.mode != MReadable and sock.sock notin writeSocks:
deleg.handleWrite(deleg.deleVal)
if sock.info == SockListening:
if sock.sock notin readSocks:
# This is a server socket, that had listen() called on it.
# This socket should have a client waiting now.
deleg.handleAccept(deleg.deleVal)
if sock.info == SockConnecting:
# Checking whether the socket has connected this way should work on
# Windows and Posix. I've checked.
if sock.sock notin writeSocks:
deleg.handleConnect(deleg.deleVal)
# Execute tasks
for i in items(d.delegates):
i.task(i.deleVal)
when isMainModule:
type
PIntType = ref TIntType
TIntType = object of TObject
val: int
PMyArg = ref TMyArg
TMyArg = object of TObject
dispatcher: PDispatcher
val: int
proc testConnect(s: PAsyncSocket, arg: PObject) =
echo("Connected! " & $PIntType(arg).val)
proc testRead(s: PAsyncSocket, arg: PObject) =
echo("Reading! " & $PIntType(arg).val)
var data = s.getSocket.recv()
if data == "":
echo("Closing connection. " & $PIntType(arg).val)
s.close()
echo(data)
echo("Finished reading! " & $PIntType(arg).val)
proc testAccept(s: PAsyncSocket, arg: PObject) =
echo("Accepting client! " & $PMyArg(arg).val)
var (client, address) = s.acceptAddr()
echo("Accepted ", address)
client.handleRead = testRead
var userArg: PIntType
new(userArg)
userArg.val = 78
client.userArg = userArg
PMyArg(arg).dispatcher.register(client)
var d = newDispatcher()
var userArg: PIntType
new(userArg)
userArg.val = 0
var s = AsyncSocket(userArg = userArg)
s.connect("amber.tenthbit.net", TPort(6667))
s.handleConnect = testConnect
s.handleRead = testRead
d.register(s)
var userArg1: PMyArg
new(userArg1)
userArg1.val = 1
userArg1.dispatcher = d
var server = AsyncSocket(userArg = userArg1)
server.handleAccept = testAccept
server.bindAddr(TPort(5555))
server.listen()
d.register(server)
while d.poll(-1): nil