Moved the global dispatcher to asyncdispatch.

This commit is contained in:
Dominik Picheta
2014-03-23 18:24:11 +00:00
parent e855f6c073
commit d310b01db1
5 changed files with 174 additions and 144 deletions

View File

@@ -111,13 +111,13 @@ when defined(windows) or defined(nimdoc):
TCompletionKey = dword
TCompletionData* = object
sock: TSocketHandle
cb: proc (sock: TSocketHandle, bytesTransferred: DWORD,
sock: TAsyncFD
cb: proc (sock: TAsyncFD, bytesTransferred: DWORD,
errcode: TOSErrorCode) {.closure.}
PDispatcher* = ref object
ioPort: THandle
handles: TSet[TSocketHandle]
handles: TSet[TAsyncFD]
TCustomOverlapped = object
Internal*: DWORD
@@ -129,30 +129,42 @@ when defined(windows) or defined(nimdoc):
PCustomOverlapped = ptr TCustomOverlapped
proc hash(x: TSocketHandle): THash {.borrow.}
TAsyncFD* = distinct int
proc hash(x: TAsyncFD): THash {.borrow.}
proc `==`*(x: TAsyncFD, y: TAsyncFD): bool {.borrow.}
proc newDispatcher*(): PDispatcher =
## Creates a new Dispatcher instance.
new result
result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[TSocketHandle]()
result.handles = initSet[TAsyncFD]()
proc register*(p: PDispatcher, sock: TSocketHandle) =
## Registers ``sock`` with the dispatcher ``p``.
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
## Retrieves the global thread-local dispatcher.
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc register*(sock: TAsyncFD) =
## Registers ``sock`` with the dispatcher.
let p = getGlobalDispatcher()
if CreateIOCompletionPort(sock.THandle, p.ioPort,
cast[TCompletionKey](sock), 1) == 0:
OSError(OSLastError())
p.handles.incl(sock)
proc verifyPresence(p: PDispatcher, sock: TSocketHandle) =
proc verifyPresence(sock: TAsyncFD) =
## Ensures that socket has been registered with the dispatcher.
let p = getGlobalDispatcher()
if sock notin p.handles:
raise newException(EInvalidValue,
"Operation performed on a socket which has not been registered with" &
" the dispatcher yet.")
proc poll*(p: PDispatcher, timeout = 500) =
proc poll*(timeout = 500) =
## Waits for completion events and processes them.
let p = getGlobalDispatcher()
if p.handles.len == 0:
raise newException(EInvalidValue, "No handles registered in dispatcher.")
@@ -170,7 +182,7 @@ when defined(windows) or defined(nimdoc):
var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
if res:
# This is useful for ensuring the reliability of the overlapped struct.
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, TOSErrorCode(-1))
@@ -178,7 +190,7 @@ when defined(windows) or defined(nimdoc):
else:
let errCode = OSLastError()
if lpOverlapped != nil:
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, errCode)
dealloc(customOverlapped)
@@ -201,7 +213,7 @@ when defined(windows) or defined(nimdoc):
addr bytesRet, nil, nil) == 0
proc initAll() =
let dummySock = socket()
let dummySock = newRawSocket()
if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
OSError(OSLastError())
if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
@@ -253,20 +265,20 @@ when defined(windows) or defined(nimdoc):
dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
RemoteSockaddr, RemoteSockaddrLength)
proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
proc connect*(socket: TAsyncFD, address: string, port: TPort,
af = AF_INET): PFuture[void] =
## Connects ``socket`` to server at ``address:port``.
##
## Returns a ``PFuture`` which will complete when the connection succeeds
## or an error occurs.
verifyPresence(p, socket)
verifyPresence(socket)
var retFuture = newFuture[void]()
# Apparently ``ConnectEx`` expects the socket to be initially bound:
var saddr: Tsockaddr_in
saddr.sin_family = int16(toInt(af))
saddr.sin_port = 0
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)),
if bindAddr(socket.TSocketHandle, cast[ptr TSockAddr](addr(saddr)),
sizeof(saddr).TSockLen) < 0'i32:
OSError(OSLastError())
@@ -279,7 +291,7 @@ when defined(windows) or defined(nimdoc):
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
retFuture.complete()
@@ -287,8 +299,9 @@ when defined(windows) or defined(nimdoc):
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
nil, 0, nil, cast[POverlapped](ol))
var ret = connectEx(socket.TSocketHandle, it.ai_addr,
sizeof(TSockAddrIn).cint, nil, 0, nil,
cast[POverlapped](ol))
if ret:
# Request to connect completed immediately.
success = true
@@ -313,14 +326,14 @@ when defined(windows) or defined(nimdoc):
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
return retFuture
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
proc recv*(socket: TAsyncFD, size: int,
flags: int = 0): PFuture[string] =
## Reads ``size`` bytes from ``socket``. Returned future will complete once
## all of the requested data is read. If socket is disconnected during the
## recv operation then the future may complete with only a part of the
## requested data read. If socket is disconnected and no data is available
## to be read then the future will complete with a value of ``""``.
verifyPresence(p, socket)
verifyPresence(socket)
var retFuture = newFuture[string]()
var dataBuf: TWSABuf
@@ -331,7 +344,7 @@ when defined(windows) or defined(nimdoc):
var flagsio = flags.dword
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
@@ -344,7 +357,7 @@ when defined(windows) or defined(nimdoc):
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
let ret = WSARecv(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived,
addr flagsio, cast[POverlapped](ol), nil)
if ret == -1:
let err = OSLastError()
@@ -373,10 +386,10 @@ when defined(windows) or defined(nimdoc):
# free ``ol``.
return retFuture
proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] =
proc send*(socket: TAsyncFD, data: string): PFuture[void] =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
verifyPresence(p, socket)
verifyPresence(socket)
var retFuture = newFuture[void]()
var dataBuf: TWSABuf
@@ -386,7 +399,7 @@ when defined(windows) or defined(nimdoc):
var bytesReceived, flags: DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
retFuture.complete()
@@ -394,7 +407,7 @@ when defined(windows) or defined(nimdoc):
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived,
let ret = WSASend(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived,
flags, cast[POverlapped](ol), nil)
if ret == -1:
let err = osLastError()
@@ -408,17 +421,17 @@ when defined(windows) or defined(nimdoc):
# free ``ol``.
return retFuture
proc acceptAddr*(p: PDispatcher, socket: TSocketHandle):
PFuture[tuple[address: string, client: TSocketHandle]] =
proc acceptAddr*(socket: TAsyncFD):
PFuture[tuple[address: string, client: TAsyncFD]] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
##
## The resulting client socket is automatically registered to dispatcher.
verifyPresence(p, socket)
var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
verifyPresence(socket)
var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]()
var clientSock = socket()
var clientSock = newRawSocket()
if clientSock == OSInvalidSocket: osError(osLastError())
const lpOutputLen = 1024
@@ -441,16 +454,16 @@ when defined(windows) or defined(nimdoc):
dwLocalAddressLength, dwRemoteAddressLength,
addr LocalSockaddr, addr localLen,
addr RemoteSockaddr, addr remoteLen)
p.register(clientSock)
register(clientSock.TAsyncFD)
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
retFuture.complete(
(address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr),
client: clientSock)
client: clientSock.TAsyncFD)
)
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
completeAccept()
@@ -459,7 +472,7 @@ when defined(windows) or defined(nimdoc):
)
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0],
let ret = acceptEx(socket.TSocketHandle, clientSock, addr lpOutputBuf[0],
dwReceiveDataLength,
dwLocalAddressLength,
dwRemoteAddressLength,
@@ -478,73 +491,87 @@ when defined(windows) or defined(nimdoc):
return retFuture
proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
proc newAsyncRawSocket*(domain: TDomain = AF_INET,
typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
protocol: TProtocol = IPPROTO_TCP): TAsyncFD =
## Creates a new socket and registers it with the dispatcher implicitly.
result = socket(domain, typ, protocol)
result.setBlocking(false)
disp.register(result)
result = newRawSocket(domain, typ, protocol).TAsyncFD
result.TSocketHandle.setBlocking(false)
register(result)
proc close*(disp: PDispatcher, socket: TSocketHandle) =
proc close*(socket: TAsyncFD) =
## Closes a socket and ensures that it is unregistered.
socket.close()
disp.handles.excl(socket)
socket.TSocketHandle.close()
getGlobalDispatcher().handles.excl(socket)
initAll()
else:
import selectors
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
type
TCallback = proc (sock: TSocketHandle): bool {.closure.}
TAsyncFD* = distinct cint
TCallback = proc (sock: TAsyncFD): bool {.closure.}
PData* = ref object of PObject
sock: TSocketHandle
sock: TAsyncFD
readCBs: seq[TCallback]
writeCBs: seq[TCallback]
PDispatcher* = ref object
selector: PSelector
proc `==`*(x, y: TAsyncFD): bool {.borrow.}
proc newDispatcher*(): PDispatcher =
new result
result.selector = newSelector()
proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
assert sock in p.selector
discard p.selector.update(sock, events)
var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
proc getGlobalDispatcher*(): PDispatcher =
if gDisp.isNil: gDisp = newDispatcher()
result = gDisp
proc register(p: PDispatcher, sock: TSocketHandle) =
proc update(sock: TAsyncFD, events: set[TEvent]) =
let p = getGlobalDispatcher()
assert sock.TSocketHandle in p.selector
discard p.selector.update(sock.TSocketHandle, events)
proc register(sock: TAsyncFD) =
let p = getGlobalDispatcher()
var data = PData(sock: sock, readCBs: @[], writeCBs: @[])
p.selector.register(sock, {}, data.PObject)
p.selector.register(sock.TSocketHandle, {}, data.PObject)
proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
proc newAsyncRawSocket*(domain: TDomain = AF_INET,
typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
result = socket(domain, typ, protocol)
result.setBlocking(false)
disp.register(result)
protocol: TProtocol = IPPROTO_TCP): TAsyncFD =
result = newRawSocket(domain, typ, protocol).TAsyncFD
result.TSocketHandle.setBlocking(false)
register(result)
proc close*(disp: PDispatcher, sock: TSocketHandle) =
sock.close()
disp.selector.unregister(sock)
proc close*(sock: TAsyncFD) =
let disp = getGlobalDispatcher()
sock.TSocketHandle.close()
disp.selector.unregister(sock.TSocketHandle)
proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
if sock notin p.selector:
proc addRead(sock: TAsyncFD, cb: TCallback) =
let p = getGlobalDispatcher()
if sock.TSocketHandle notin p.selector:
raise newException(EInvalidValue, "File descriptor not registered.")
p.selector[sock].data.PData.readCBs.add(cb)
p.update(sock, p.selector[sock].events + {EvRead})
p.selector[sock.TSocketHandle].data.PData.readCBs.add(cb)
update(sock, p.selector[sock.TSocketHandle].events + {EvRead})
proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
if sock notin p.selector:
proc addWrite(sock: TAsyncFD, cb: TCallback) =
let p = getGlobalDispatcher()
if sock.TSocketHandle notin p.selector:
raise newException(EInvalidValue, "File descriptor not registered.")
p.selector[sock].data.PData.writeCBs.add(cb)
p.update(sock, p.selector[sock].events + {EvWrite})
p.selector[sock.TSocketHandle].data.PData.writeCBs.add(cb)
update(sock, p.selector[sock.TSocketHandle].events + {EvWrite})
proc poll*(p: PDispatcher, timeout = 500) =
proc poll*(timeout = 500) =
let p = getGlobalDispatcher()
for info in p.selector.select(timeout):
let data = PData(info.key.data)
assert data.sock == info.key.fd
assert data.sock == info.key.fd.TAsyncFD
#echo("In poll ", data.sock.cint)
if EvRead in info.events:
# Callback may add items to ``data.readCBs`` which causes issues if
@@ -570,17 +597,16 @@ else:
if data.readCBs.len != 0: newEvents = {EvRead}
if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
if newEvents != info.key.events:
echo(info.key.events, " -> ", newEvents)
p.update(data.sock, newEvents)
update(data.sock, newEvents)
else:
# FD no longer a part of the selector. Likely been closed
# (e.g. socket disconnected).
proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
proc connect*(socket: TAsyncFD, address: string, port: TPort,
af = AF_INET): PFuture[void] =
var retFuture = newFuture[void]()
proc cb(sock: TSocketHandle): bool =
proc cb(sock: TAsyncFD): bool =
# We have connected.
retFuture.complete()
return true
@@ -590,7 +616,7 @@ else:
var lastError: TOSErrorCode
var it = aiList
while it != nil:
var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen)
var ret = connect(socket.TSocketHandle, it.ai_addr, it.ai_addrlen.TSocklen)
if ret == 0:
# Request to connect completed immediately.
success = true
@@ -600,7 +626,7 @@ else:
lastError = osLastError()
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
success = true
addWrite(p, socket, cb)
addWrite(socket, cb)
break
else:
success = false
@@ -611,17 +637,18 @@ else:
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
return retFuture
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
proc recv*(socket: TAsyncFD, size: int,
flags: int = 0): PFuture[string] =
var retFuture = newFuture[string]()
var readBuffer = newString(size)
var sizeRead = 0
proc cb(sock: TSocketHandle): bool =
proc cb(sock: TAsyncFD): bool =
result = true
let netSize = size - sizeRead
let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint)
let res = recv(sock.TSocketHandle, addr readBuffer[sizeRead], netSize,
flags.cint)
#echo("recv cb res: ", res)
if res < 0:
let lastError = osLastError()
@@ -645,19 +672,19 @@ else:
retFuture.complete(readBuffer)
#echo("Recv cb result: ", result)
addRead(p, socket, cb)
addRead(socket, cb)
return retFuture
proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] =
proc send*(socket: TAsyncFD, data: string): PFuture[void] =
var retFuture = newFuture[void]()
var written = 0
proc cb(sock: TSocketHandle): bool =
proc cb(sock: TAsyncFD): bool =
result = true
let netSize = data.len-written
var d = data.cstring
let res = send(sock, addr d[written], netSize, 0.cint)
let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
@@ -670,18 +697,18 @@ else:
result = false # We still have data to send.
else:
retFuture.complete()
addWrite(p, socket, cb)
addWrite(socket, cb)
return retFuture
proc acceptAddr*(p: PDispatcher, socket: TSocketHandle):
PFuture[tuple[address: string, client: TSocketHandle]] =
var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
proc cb(sock: TSocketHandle): bool =
proc acceptAddr*(socket: TAsyncFD):
PFuture[tuple[address: string, client: TAsyncFD]] =
var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]()
proc cb(sock: TAsyncFD): bool =
result = true
var sockAddress: Tsockaddr_in
var addrLen = sizeof(sockAddress).TSocklen
var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)),
addr(addrLen))
var client = accept(sock.TSocketHandle,
cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen))
if client == osInvalidSocket:
let lastError = osLastError()
assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
@@ -690,19 +717,19 @@ else:
else:
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
else:
p.register(client)
retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client))
addRead(p, socket, cb)
register(client.TAsyncFD)
retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD))
addRead(socket, cb)
return retFuture
proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] =
proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection.
## The future will complete when the connection is successfully accepted.
var retFut = newFuture[TSocketHandle]()
var fut = p.acceptAddr(socket)
var retFut = newFuture[TAsyncFD]()
var fut = acceptAddr(socket)
fut.callback =
proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
assert future.finished
if future.failed:
retFut.fail(future.error)
@@ -891,7 +918,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
echo(toStrLit(result))
proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} =
proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} =
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.
##
@@ -912,28 +939,24 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
result = ""
var c = ""
while true:
c = await p.recv(socket, 1)
c = await recv(socket, 1)
if c.len == 0:
return ""
if c == "\r":
c = await p.recv(socket, 1, MSG_PEEK)
c = await recv(socket, 1, MSG_PEEK)
if c.len > 0 and c == "\L":
discard await p.recv(socket, 1)
discard await recv(socket, 1)
addNLIfEmpty()
return
elif c == "\L":
addNLIfEmpty()
return
add(result.string, c)
var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher
gDisp = newDispatcher()
add(result, c)
proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
gDisp.poll()
poll()
when isMainModule:

View File

@@ -1,3 +1,11 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2014 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
import asyncdispatch
import rawsockets
import net
@@ -7,7 +15,7 @@ when defined(ssl):
type
TAsyncSocket = object ## socket type
fd: TSocketHandle
fd: TAsyncFD
case isBuffered: bool # determines whether this socket is buffered.
of true:
buffer: array[0..BufferSize, char]
@@ -28,18 +36,18 @@ type
# TODO: Save AF, domain etc info and reuse it in procs which need it like connect.
proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket =
assert fd != osInvalidSocket
proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket =
assert fd != osInvalidSocket.TAsyncFD
new(result)
result.fd = fd
result.isBuffered = isBuff
if isBuff:
result.currPos = 0
proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
proc newAsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket =
## Creates a new asynchronous socket.
result = newSocket(gDisp.socket(domain, typ, protocol), buffered)
result = newSocket(newAsyncRawSocket(domain, typ, protocol), buffered)
proc connect*(socket: PAsyncSocket, address: string, port: TPort,
af = AF_INET): PFuture[void] =
@@ -47,7 +55,7 @@ proc connect*(socket: PAsyncSocket, address: string, port: TPort,
##
## Returns a ``PFuture`` which will complete when the connection succeeds
## or an error occurs.
result = gDisp.connect(socket.fd, address, port, af)
result = connect(socket.fd, address, port, af)
proc recv*(socket: PAsyncSocket, size: int,
flags: int = 0): PFuture[string] =
@@ -56,12 +64,12 @@ proc recv*(socket: PAsyncSocket, size: int,
## recv operation then the future may complete with only a part of the
## requested data read. If socket is disconnected and no data is available
## to be read then the future will complete with a value of ``""``.
result = gDisp.recv(socket.fd, size, flags)
result = recv(socket.fd, size, flags)
proc send*(socket: PAsyncSocket, data: string): PFuture[void] =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
result = gDisp.send(socket.fd, data)
result = send(socket.fd, data)
proc acceptAddr*(socket: PAsyncSocket):
PFuture[tuple[address: string, client: PAsyncSocket]] =
@@ -69,9 +77,9 @@ proc acceptAddr*(socket: PAsyncSocket):
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
var fut = gDisp.acceptAddr(socket.fd)
var fut = acceptAddr(socket.fd)
fut.callback =
proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
assert future.finished
if future.failed:
retFuture.fail(future.readError)
@@ -133,7 +141,7 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
when isMainModule:
proc main() {.async.} =
var sock = AsyncSocket()
var sock = newAsyncSocket()
await sock.connect("irc.freenode.net", TPort(6667))
while true:
let line = await sock.recvLine()

View File

@@ -347,7 +347,7 @@ type
ETimeout* = object of ESynch
proc newSocket(fd: TSocketHandle, isBuff: bool): PSocket =
proc createSocket(fd: TSocketHandle, isBuff: bool): PSocket =
assert fd != osInvalidSocket
new(result)
result.fd = fd
@@ -355,15 +355,15 @@ proc newSocket(fd: TSocketHandle, isBuff: bool): PSocket =
if isBuff:
result.currPos = 0
proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
proc newSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP, buffered = true): PSocket =
## Creates a new socket.
##
## If an error occurs EOS will be raised.
let fd = rawsockets.socket(domain, typ, protocol)
let fd = newRawSocket(domain, typ, protocol)
if fd == osInvalidSocket:
osError(osLastError())
result = newSocket(fd, buffered)
result = createSocket(fd, buffered)
when defined(ssl):
CRYPTO_malloc_init()

View File

@@ -143,7 +143,7 @@ else:
result = cint(ord(p))
proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
proc newRawSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
## Creates a new socket; returns `InvalidSocket` if an error occurs.
socket(toInt(domain), toInt(typ), toInt(protocol))

View File

@@ -5,7 +5,6 @@ discard """
"""
import asyncdispatch, rawsockets, net, strutils, os
var disp = newDispatcher()
var msgCount = 0
const
@@ -14,31 +13,31 @@ const
var clientCount = 0
proc sendMessages(disp: PDispatcher, client: TSocketHandle) {.async.} =
proc sendMessages(client: TAsyncFD) {.async.} =
for i in 0 .. <messagesToSend:
await disp.send(client, "Message " & $i & "\c\L")
await send(client, "Message " & $i & "\c\L")
proc launchSwarm(disp: PDispatcher, port: TPort) {.async.} =
proc launchSwarm(port: TPort) {.async.} =
for i in 0 .. <swarmSize:
var sock = disp.socket()
var sock = newAsyncRawSocket()
#disp.register(sock)
await disp.connect(sock, "localhost", port)
await connect(sock, "localhost", port)
when true:
await sendMessages(disp, sock)
disp.close(sock)
await sendMessages(sock)
close(sock)
else:
# Issue #932: https://github.com/Araq/Nimrod/issues/932
var msgFut = sendMessages(disp, sock)
var msgFut = sendMessages(sock)
msgFut.callback =
proc () =
disp.close(sock)
close(sock)
proc readMessages(disp: PDispatcher, client: TSocketHandle) {.async.} =
proc readMessages(client: TAsyncFD) {.async.} =
while true:
var line = await disp.recvLine(client)
var line = await recvLine(client)
if line == "":
disp.close(client)
close(client)
clientCount.inc
break
else:
@@ -47,8 +46,8 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle) {.async.} =
else:
doAssert false
proc createServer(disp: PDispatcher, port: TPort) {.async.} =
var server = disp.socket()
proc createServer(port: TPort) {.async.} =
var server = newAsyncRawSocket()
#disp.register(server)
block:
var name: TSockaddr_in
@@ -58,20 +57,20 @@ proc createServer(disp: PDispatcher, port: TPort) {.async.} =
name.sin_family = toInt(AF_INET)
name.sin_port = htons(int16(port))
name.sin_addr.s_addr = htonl(INADDR_ANY)
if bindAddr(server, cast[ptr TSockAddr](addr(name)),
sizeof(name).TSocklen) < 0'i32:
if bindAddr(server.TSocketHandle, cast[ptr TSockAddr](addr(name)),
sizeof(name).TSocklen) < 0'i32:
osError(osLastError())
discard server.listen()
discard server.TSocketHandle.listen()
while true:
var client = await disp.accept(server)
readMessages(disp, client)
var client = await accept(server)
readMessages(client)
# TODO: Test: readMessages(disp, await disp.accept(server))
disp.createServer(TPort(10335))
disp.launchSwarm(TPort(10335))
createServer(TPort(10335))
launchSwarm(TPort(10335))
while true:
disp.poll()
poll()
if clientCount == swarmSize: break
assert msgCount == swarmSize * messagesToSend