prettified some async modules

This commit is contained in:
Araq
2014-08-31 12:38:26 +02:00
parent ae548a696f
commit 697789a313
3 changed files with 84 additions and 84 deletions

View File

@@ -102,13 +102,13 @@ type
fd*: SocketHandle
deleVal*: RootRef
handleRead*: proc (h: PObject) {.nimcall, gcsafe.}
handleWrite*: proc (h: PObject) {.nimcall, gcsafe.}
handleError*: proc (h: PObject) {.nimcall, gcsafe.}
hasDataBuffered*: proc (h: PObject): bool {.nimcall, gcsafe.}
handleRead*: proc (h: RootRef) {.nimcall, gcsafe.}
handleWrite*: proc (h: RootRef) {.nimcall, gcsafe.}
handleError*: proc (h: RootRef) {.nimcall, gcsafe.}
hasDataBuffered*: proc (h: RootRef): bool {.nimcall, gcsafe.}
open*: bool
task*: proc (h: PObject) {.nimcall, gcsafe.}
task*: proc (h: RootRef) {.nimcall, gcsafe.}
mode*: FileMode
Delegate* = ref DelegateObj
@@ -146,32 +146,32 @@ type
].}
proc newDelegate*(): PDelegate =
proc newDelegate*(): Delegate =
## Creates a new delegate.
new(result)
result.handleRead = (proc (h: PObject) = discard)
result.handleWrite = (proc (h: PObject) = discard)
result.handleError = (proc (h: PObject) = discard)
result.hasDataBuffered = (proc (h: PObject): bool = return false)
result.task = (proc (h: PObject) = discard)
result.handleRead = (proc (h: RootRef) = discard)
result.handleWrite = (proc (h: RootRef) = discard)
result.handleError = (proc (h: RootRef) = discard)
result.hasDataBuffered = (proc (h: RootRef): bool = return false)
result.task = (proc (h: RootRef) = discard)
result.mode = fmRead
proc newAsyncSocket(): PAsyncSocket =
proc newAsyncSocket(): AsyncSocket =
new(result)
result.info = SockIdle
result.handleRead = (proc (s: PAsyncSocket) = discard)
result.handleRead = (proc (s: AsyncSocket) = discard)
result.handleWrite = nil
result.handleConnect = (proc (s: PAsyncSocket) = discard)
result.handleAccept = (proc (s: PAsyncSocket) = discard)
result.handleTask = (proc (s: PAsyncSocket) = discard)
result.handleConnect = (proc (s: AsyncSocket) = discard)
result.handleAccept = (proc (s: AsyncSocket) = discard)
result.handleTask = (proc (s: AsyncSocket) = discard)
result.lineBuffer = "".TaintedString
result.sendBuffer = ""
proc asyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP,
buffered = true): PAsyncSocket =
proc asyncSocket*(domain: Domain = AF_INET, typ: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP,
buffered = true): AsyncSocket =
## Initialises an AsyncSocket object. If a socket cannot be initialised
## EOS is raised.
result = newAsyncSocket()
@@ -180,7 +180,7 @@ proc asyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
if result.socket == invalidSocket: raiseOSError(osLastError())
result.socket.setBlocking(false)
proc toAsyncSocket*(sock: TSocket, state: TInfo = SockConnected): PAsyncSocket =
proc toAsyncSocket*(sock: Socket, state: SocketStatus = SockConnected): PAsyncSocket =
## Wraps an already initialized ``TSocket`` into a PAsyncSocket.
## This is useful if you want to use an already connected TSocket as an
## asynchronous PAsyncSocket in asyncio's event loop.
@@ -211,36 +211,36 @@ proc toAsyncSocket*(sock: TSocket, state: TInfo = SockConnected): PAsyncSocket =
result.socket.setBlocking(false)
result.info = state
proc asyncSockHandleRead(h: PObject) =
proc asyncSockHandleRead(h: RootRef) =
when defined(ssl):
if PAsyncSocket(h).socket.isSSL and not
PAsyncSocket(h).socket.gotHandshake:
return
if PAsyncSocket(h).info != SockListening:
if PAsyncSocket(h).info != SockConnecting:
PAsyncSocket(h).handleRead(PAsyncSocket(h))
if AsyncSocket(h).info != SockListening:
if AsyncSocket(h).info != SockConnecting:
AsyncSocket(h).handleRead(AsyncSocket(h))
else:
PAsyncSocket(h).handleAccept(PAsyncSocket(h))
AsyncSocket(h).handleAccept(AsyncSocket(h))
proc close*(sock: PAsyncSocket) {.gcsafe.}
proc asyncSockHandleWrite(h: PObject) =
proc close*(sock: AsyncSocket) {.gcsafe.}
proc asyncSockHandleWrite(h: RootRef) =
when defined(ssl):
if PAsyncSocket(h).socket.isSSL and not
PAsyncSocket(h).socket.gotHandshake:
return
if PAsyncSocket(h).info == SockConnecting:
PAsyncSocket(h).handleConnect(PAsyncSocket(h))
PAsyncSocket(h).info = SockConnected
if AsyncSocket(h).info == SockConnecting:
AsyncSocket(h).handleConnect(AsyncSocket(h))
AsyncSocket(h).info = SockConnected
# Stop receiving write events if there is no handleWrite event.
if PAsyncSocket(h).handleWrite == nil:
PAsyncSocket(h).deleg.mode = fmRead
if AsyncSocket(h).handleWrite == nil:
AsyncSocket(h).deleg.mode = fmRead
else:
PAsyncSocket(h).deleg.mode = fmReadWrite
AsyncSocket(h).deleg.mode = fmReadWrite
else:
if PAsyncSocket(h).sendBuffer != "":
let sock = PAsyncSocket(h)
if AsyncSocket(h).sendBuffer != "":
let sock = AsyncSocket(h)
try:
let bytesSent = sock.socket.sendAsync(sock.sendBuffer)
if bytesSent == 0:
@@ -252,16 +252,16 @@ proc asyncSockHandleWrite(h: PObject) =
elif bytesSent == sock.sendBuffer.len:
sock.sendBuffer = ""
if PAsyncSocket(h).handleWrite != nil:
PAsyncSocket(h).handleWrite(PAsyncSocket(h))
except EOS:
if AsyncSocket(h).handleWrite != nil:
AsyncSocket(h).handleWrite(AsyncSocket(h))
except OSError:
# Most likely the socket closed before the full buffer could be sent to it.
sock.close() # TODO: Provide a handleError for users?
else:
if PAsyncSocket(h).handleWrite != nil:
PAsyncSocket(h).handleWrite(PAsyncSocket(h))
if AsyncSocket(h).handleWrite != nil:
AsyncSocket(h).handleWrite(AsyncSocket(h))
else:
PAsyncSocket(h).deleg.mode = fmRead
AsyncSocket(h).deleg.mode = fmRead
when defined(ssl):
proc asyncSockDoHandshake(h: PObject) {.gcsafe.} =
@@ -278,13 +278,13 @@ when defined(ssl):
discard PAsyncSocket(h).socket.handshake()
proc asyncSockTask(h: PObject) =
proc asyncSockTask(h: RootRef) =
when defined(ssl):
h.asyncSockDoHandshake()
PAsyncSocket(h).handleTask(PAsyncSocket(h))
AsyncSocket(h).handleTask(AsyncSocket(h))
proc toDelegate(sock: PAsyncSocket): PDelegate =
proc toDelegate(sock: AsyncSocket): Delegate =
result = newDelegate()
result.deleVal = sock
result.fd = getFD(sock.socket)
@@ -297,8 +297,8 @@ proc toDelegate(sock: PAsyncSocket): PDelegate =
#result.handleError = (proc (h: PObject) = assert(false))
result.hasDataBuffered =
proc (h: PObject): bool {.nimcall.} =
return PAsyncSocket(h).socket.hasDataBuffered()
proc (h: RootRef): bool {.nimcall.} =
return AsyncSocket(h).socket.hasDataBuffered()
sock.deleg = result
if sock.info notin {SockIdle, SockClosed}:
@@ -306,22 +306,22 @@ proc toDelegate(sock: PAsyncSocket): PDelegate =
else:
sock.deleg.open = false
proc connect*(sock: PAsyncSocket, name: string, port = TPort(0),
af: TDomain = AF_INET) =
proc connect*(sock: AsyncSocket, name: string, port = Port(0),
af: Domain = AF_INET) =
## Begins connecting ``sock`` to ``name``:``port``.
sock.socket.connectAsync(name, port, af)
sock.info = SockConnecting
if sock.deleg != nil:
sock.deleg.open = true
proc close*(sock: PAsyncSocket) =
proc close*(sock: AsyncSocket) =
## Closes ``sock``. Terminates any current connections.
sock.socket.close()
sock.info = SockClosed
if sock.deleg != nil:
sock.deleg.open = false
proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
proc bindAddr*(sock: AsyncSocket, port = Port(0), address = "") =
## Equivalent to ``sockets.bindAddr``.
sock.socket.bindAddr(port, address)
if sock.proto == IPPROTO_UDP:
@@ -329,14 +329,14 @@ proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
if sock.deleg != nil:
sock.deleg.open = true
proc listen*(sock: PAsyncSocket) =
proc listen*(sock: AsyncSocket) =
## Equivalent to ``sockets.listen``.
sock.socket.listen()
sock.info = SockListening
if sock.deleg != nil:
sock.deleg.open = true
proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
proc acceptAddr*(server: AsyncSocket, client: var AsyncSocket,
address: var string) =
## Equivalent to ``sockets.acceptAddr``. This procedure should be called in
## a ``handleAccept`` event handler **only** once.
@@ -344,7 +344,7 @@ proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
## **Note**: ``client`` needs to be initialised.
assert(client != nil)
client = newAsyncSocket()
var c: TSocket
var c: Socket
new(c)
when defined(ssl):
if server.socket.isSSL:
@@ -377,12 +377,12 @@ proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
client.sendBuffer = ""
client.info = SockConnected
proc accept*(server: PAsyncSocket, client: var PAsyncSocket) =
proc accept*(server: AsyncSocket, client: var AsyncSocket) =
## Equivalent to ``sockets.accept``.
var dummyAddr = ""
server.acceptAddr(client, dummyAddr)
proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket,
proc acceptAddr*(server: AsyncSocket): tuple[sock: AsyncSocket,
address: string] {.deprecated.} =
## Equivalent to ``sockets.acceptAddr``.
##
@@ -392,7 +392,7 @@ proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket,
acceptAddr(server, client, address)
return (client, address)
proc accept*(server: PAsyncSocket): PAsyncSocket {.deprecated.} =
proc accept*(server: AsyncSocket): AsyncSocket {.deprecated.} =
## Equivalent to ``sockets.accept``.
##
## **Deprecated since version 0.9.0:** Please use the function above.
@@ -400,54 +400,54 @@ proc accept*(server: PAsyncSocket): PAsyncSocket {.deprecated.} =
var address = ""
server.acceptAddr(result, address)
proc newDispatcher*(): PDispatcher =
proc newDispatcher*(): Dispatcher =
new(result)
result.delegates = @[]
proc register*(d: PDispatcher, deleg: PDelegate) =
proc register*(d: Dispatcher, deleg: Delegate) =
## Registers delegate ``deleg`` with dispatcher ``d``.
d.delegates.add(deleg)
proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} =
proc register*(d: Dispatcher, sock: AsyncSocket): Delegate {.discardable.} =
## Registers async socket ``sock`` with dispatcher ``d``.
result = sock.toDelegate()
d.register(result)
proc unregister*(d: PDispatcher, deleg: PDelegate) =
proc unregister*(d: Dispatcher, deleg: Delegate) =
## Unregisters deleg ``deleg`` from dispatcher ``d``.
for i in 0..len(d.delegates)-1:
if d.delegates[i] == deleg:
d.delegates.del(i)
return
raise newException(EInvalidIndex, "Could not find delegate.")
raise newException(IndexError, "Could not find delegate.")
proc isWriteable*(s: PAsyncSocket): bool =
proc isWriteable*(s: AsyncSocket): bool =
## Determines whether socket ``s`` is ready to be written to.
var writeSock = @[s.socket]
return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
converter getSocket*(s: PAsyncSocket): TSocket =
converter getSocket*(s: AsyncSocket): Socket =
return s.socket
proc isConnected*(s: PAsyncSocket): bool =
proc isConnected*(s: AsyncSocket): bool =
## Determines whether ``s`` is connected.
return s.info == SockConnected
proc isListening*(s: PAsyncSocket): bool =
proc isListening*(s: AsyncSocket): bool =
## Determines whether ``s`` is listening for incoming connections.
return s.info == SockListening
proc isConnecting*(s: PAsyncSocket): bool =
proc isConnecting*(s: AsyncSocket): bool =
## Determines whether ``s`` is connecting.
return s.info == SockConnecting
proc isClosed*(s: PAsyncSocket): bool =
proc isClosed*(s: AsyncSocket): bool =
## Determines whether ``s`` has been closed.
return s.info == SockClosed
proc isSendDataBuffered*(s: PAsyncSocket): bool =
proc isSendDataBuffered*(s: AsyncSocket): bool =
## Determines whether ``s`` has data waiting to be sent, i.e. whether this
## socket's sendBuffer contains data.
return s.sendBuffer.len != 0
proc setHandleWrite*(s: PAsyncSocket,
handleWrite: proc (s: PAsyncSocket) {.closure, gcsafe.}) =
proc setHandleWrite*(s: AsyncSocket,
handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}) =
## Setter for the ``handleWrite`` event.
##
## To remove this event you should use the ``delHandleWrite`` function.
@@ -457,12 +457,12 @@ proc setHandleWrite*(s: PAsyncSocket,
s.deleg.mode = fmReadWrite
s.handleWrite = handleWrite
proc delHandleWrite*(s: PAsyncSocket) =
proc delHandleWrite*(s: AsyncSocket) =
## Removes the ``handleWrite`` event handler on ``s``.
s.handleWrite = nil
{.push warning[deprecated]: off.}
proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool {.deprecated.} =
proc recvLine*(s: AsyncSocket, line: var TaintedString): bool {.deprecated.} =
## Behaves similar to ``sockets.recvLine``, however it handles non-blocking
## sockets properly. This function guarantees that ``line`` is a full line,
## if this function can only retrieve some data; it will save this data and
@@ -495,7 +495,7 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool {.deprecated.} =
result = false
{.pop.}
proc readLine*(s: PAsyncSocket, line: var TaintedString): bool =
proc readLine*(s: AsyncSocket, line: var TaintedString): bool =
## Behaves similar to ``sockets.readLine``, however it handles non-blocking
## sockets properly. This function guarantees that ``line`` is a full line,
## if this function can only retrieve some data; it will save this data and
@@ -525,7 +525,7 @@ proc readLine*(s: PAsyncSocket, line: var TaintedString): bool =
of ReadDisconnected:
result = true
proc send*(sock: PAsyncSocket, data: string) =
proc send*(sock: AsyncSocket, data: string) =
## Sends ``data`` to socket ``sock``. This is basically a nicer implementation
## of ``sockets.sendAsync``.
##
@@ -545,7 +545,7 @@ proc send*(sock: PAsyncSocket, data: string) =
sock.sendBuffer.add(data[bytesSent .. -1])
sock.deleg.mode = fmReadWrite
proc timeValFromMilliseconds(timeout = 500): TimeVal =
proc timeValFromMilliseconds(timeout = 500): Timeval =
if timeout != -1:
var seconds = timeout div 1000
result.tv_sec = seconds.int32
@@ -568,9 +568,9 @@ proc pruneSocketSet(s: var seq[Delegate], fd: var FdSet) =
inc(i)
setLen(s, L)
proc select(readfds, writefds, exceptfds: var seq[PDelegate],
proc select(readfds, writefds, exceptfds: var seq[Delegate],
timeout = 500): int =
var tv {.noInit.}: TimeVal = timeValFromMilliseconds(timeout)
var tv {.noInit.}: Timeval = timeValFromMilliseconds(timeout)
var rd, wr, ex: FdSet
var m = 0
@@ -587,7 +587,7 @@ proc select(readfds, writefds, exceptfds: var seq[PDelegate],
pruneSocketSet(writefds, (wr))
pruneSocketSet(exceptfds, (ex))
proc poll*(d: PDispatcher, timeout: int = 500): bool =
proc poll*(d: Dispatcher, timeout: int = 500): bool =
## This function checks for events on all the delegates in the `PDispatcher`.
## It then proceeds to call the correct event handler.
##
@@ -600,7 +600,7 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool =
## only be executed after one or more file descriptors becomes readable or
## writeable.
result = true
var readDg, writeDg, errorDg: seq[PDelegate] = @[]
var readDg, writeDg, errorDg: seq[Delegate] = @[]
var len = d.delegates.len
var dc = 0
@@ -648,7 +648,7 @@ proc poll*(d: PDispatcher, timeout: int = 500): bool =
for i in items(d.delegates):
i.task(i.deleVal)
proc len*(disp: PDispatcher): int =
proc len*(disp: Dispatcher): int =
## Retrieves the amount of delegates in ``disp``.
return disp.delegates.len

View File

@@ -79,7 +79,7 @@ type
c: AsyncSocket
mode: ClientMode
dataLen: int
headers: PStringTable ## the parsed headers
headers: StringTableRef ## the parsed headers
input: string ## the input buffer
AsyncScgiStateObj = object
@@ -105,7 +105,7 @@ proc open*(s: var ScgiState, port = Port(4000), address = "127.0.0.1",
reuseAddr = false) =
## opens a connection.
s.bufLen = 4000
s.input = newString(s.buflen) # will be reused
s.input = newString(s.bufLen) # will be reused
s.server = socket()
if s.server == invalidSocket: raiseOSError(osLastError())
@@ -262,7 +262,7 @@ proc open*(handleRequest: proc (client: AsyncSocket,
## automatically unless it has already been closed.
var cres: AsyncScgiState
new(cres)
cres.asyncServer = AsyncSocket()
cres.asyncServer = asyncSocket()
cres.asyncServer.handleAccept = proc (s: AsyncSocket) = handleAccept(s, cres)
if reuseAddr:
cres.asyncServer.setSockOpt(OptReuseAddr, true)

View File

@@ -1,6 +1,6 @@
discard """
line: 18
errormsg: "type mismatch: got (proc (TScgi) | proc (PAsyncSocket, PStringTable, string){.gcsafe.})"
errormsg: "type mismatch: got (proc (TScgi) | proc (AsyncSocket, StringTableRef, string){.gcsafe.})"
"""
#bug #442