Many renames. Created high level asyncnet module.

This commit is contained in:
Dominik Picheta
2014-03-22 22:33:02 +00:00
parent 2ce9f1c77f
commit 192e11e7b7
4 changed files with 175 additions and 12 deletions

View File

@@ -9,14 +9,14 @@
import os, oids, tables, strutils, macros
import sockets2
import rawsockets
## Asyncio2
## AsyncDispatch
## --------
##
## This module implements a brand new asyncio module based on Futures.
## IOCP is used under the hood on Windows and the selectors module is used for
## other operating systems.
## This module implements a brand new dispatcher based on Futures.
## On Windows IOCP is used and on other operating systems the selectors module
## is used instead.
# -- Futures
@@ -27,7 +27,7 @@ type
PFuture*[T] = ref object of PFutureBase
value: T
error: ref EBase
error*: ref EBase # TODO: This shouldn't be necessary, generics bug?
proc newFuture*[T](): PFuture[T] =
## Creates a new future.
@@ -90,6 +90,11 @@ proc read*[T](future: PFuture[T]): T =
# TODO: Make a custom exception type for this?
raise newException(EInvalidValue, "Future still in progress.")
proc readError*[T](future: PFuture[T]): ref EBase =
if future.error != nil: return future.error
else:
raise newException(EInvalidValue, "No error in future.")
proc finished*[T](future: PFuture[T]): bool =
## Determines whether ``future`` has completed.
##
@@ -478,6 +483,7 @@ when defined(windows) or defined(nimdoc):
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
## Creates a new socket and registers it with the dispatcher implicitly.
result = socket(domain, typ, protocol)
result.setBlocking(false)
disp.register(result)
proc close*(disp: PDispatcher, socket: TSocketHandle) =
@@ -516,6 +522,7 @@ else:
typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
result = socket(domain, typ, protocol)
result.setBlocking(false)
disp.register(result)
proc close*(disp: PDispatcher, sock: TSocketHandle) =
@@ -919,6 +926,15 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
return
add(result.string, c)
var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher
gDisp = newDispatcher()
proc runForever*() =
## Begins a never ending global dispatcher poll loop.
while true:
gDisp.poll()
when isMainModule:
var p = newDispatcher()

147
lib/pure/asyncnet.nim Normal file
View File

@@ -0,0 +1,147 @@
import asyncdispatch
import rawsockets
import net
when defined(ssl):
import openssl
type
TAsyncSocket = object ## socket type
fd: TSocketHandle
case isBuffered: bool # determines whether this socket is buffered.
of true:
buffer: array[0..BufferSize, char]
currPos: int # current index in buffer
bufLen: int # current length of buffer
of false: nil
when defined(ssl):
case isSsl: bool
of true:
sslHandle: PSSL
sslContext: PSSLContext
sslNoHandshake: bool # True if needs handshake.
sslHasPeekChar: bool
sslPeekChar: char
of false: nil
PAsyncSocket* = ref TAsyncSocket
# TODO: Save AF, domain etc info and reuse it in procs which need it like connect.
proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket =
assert fd != osInvalidSocket
new(result)
result.fd = fd
result.isBuffered = isBuff
if isBuff:
result.currPos = 0
proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket =
## Creates a new asynchronous socket.
result = newSocket(gDisp.socket(domain, typ, protocol), buffered)
proc connect*(socket: PAsyncSocket, address: string, port: TPort,
af = AF_INET): PFuture[void] =
## Connects ``socket`` to server at ``address:port``.
##
## Returns a ``PFuture`` which will complete when the connection succeeds
## or an error occurs.
result = gDisp.connect(socket.fd, address, port, af)
proc recv*(socket: PAsyncSocket, size: int,
flags: int = 0): PFuture[string] =
## Reads ``size`` bytes from ``socket``. Returned future will complete once
## all of the requested data is read. If socket is disconnected during the
## recv operation then the future may complete with only a part of the
## requested data read. If socket is disconnected and no data is available
## to be read then the future will complete with a value of ``""``.
result = gDisp.recv(socket.fd, size, flags)
proc send*(socket: PAsyncSocket, data: string): PFuture[void] =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
result = gDisp.send(socket.fd, data)
proc acceptAddr*(socket: PAsyncSocket):
PFuture[tuple[address: string, client: PAsyncSocket]] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
var fut = gDisp.acceptAddr(socket.fd)
fut.callback =
proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
assert future.finished
if future.failed:
retFuture.fail(future.readError)
else:
let resultTup = (future.read.address,
newSocket(future.read.client, socket.isBuffered))
retFuture.complete(resultTup)
return retFuture
proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection.
## The future will complete when the connection is successfully accepted.
var retFut = newFuture[PAsyncSocket]()
var fut = acceptAddr(socket)
fut.callback =
proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) =
assert future.finished
if future.failed:
retFut.fail(future.readError)
else:
retFut.complete(future.read.client)
return retFut
proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.
##
## If a full line is read ``\r\L`` is not
## added to ``line``, however if solely ``\r\L`` is read then ``line``
## will be set to it.
##
## If the socket is disconnected, ``line`` will be set to ``""``.
##
## If the socket is disconnected in the middle of a line (before ``\r\L``
## is read) then line will be set to ``""``.
## The partial line **will be lost**.
template addNLIfEmpty(): stmt =
if result.len == 0:
result.add("\c\L")
result = ""
var c = ""
while true:
c = await recv(socket, 1)
if c.len == 0:
return ""
if c == "\r":
c = await recv(socket, 1, MSG_PEEK)
if c.len > 0 and c == "\L":
discard await recv(socket, 1)
addNLIfEmpty()
return
elif c == "\L":
addNLIfEmpty()
return
add(result.string, c)
when isMainModule:
proc main() {.async.} =
var sock = AsyncSocket()
await sock.connect("irc.freenode.net", TPort(6667))
while true:
let line = await sock.recvLine()
if line == "":
echo("Disconnected")
break
else:
echo("Got line: ", line)
main()
runForever()

View File

@@ -10,7 +10,7 @@
## This module implements a high-level cross-platform sockets interface.
{.deadCodeElim: on.}
import sockets2, os, strutils, unsigned, parseutils, times
import rawsockets, os, strutils, unsigned, parseutils, times
type
IpAddressFamily* {.pure.} = enum ## Describes the type of an IP address
@@ -360,7 +360,7 @@ proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
## Creates a new socket.
##
## If an error occurs EOS will be raised.
let fd = sockets2.socket(domain, typ, protocol)
let fd = rawsockets.socket(domain, typ, protocol)
if fd == osInvalidSocket:
osError(osLastError())
result = newSocket(fd, buffered)

View File

@@ -209,13 +209,13 @@ proc htonl*(x: int32): int32 =
## Converts 32-bit integers from host to network byte order. On machines
## where the host byte order is the same as network byte order, this is
## a no-op; otherwise, it performs a 4-byte swap operation.
result = sockets2.ntohl(x)
result = rawsockets.ntohl(x)
proc htons*(x: int16): int16 =
## Converts 16-bit positive integers from host to network byte order.
## On machines where the host byte order is the same as network byte
## order, this is a no-op; otherwise, it performs a 2-byte swap operation.
result = sockets2.ntohs(x)
result = rawsockets.ntohs(x)
proc getServByName*(name, proto: string): TServent {.tags: [FReadIO].} =
## Searches the database from the beginning and finds the first entry for
@@ -256,7 +256,7 @@ proc getHostByAddr*(ip: string): Thostent {.tags: [FReadIO].} =
when defined(windows):
var s = winlean.gethostbyaddr(addr(myaddr), sizeof(myaddr).cuint,
cint(sockets2.AF_INET))
cint(rawsockets.AF_INET))
if s == nil: osError(osLastError())
else:
var s = posix.gethostbyaddr(addr(myaddr), sizeof(myaddr).TSocklen,
@@ -312,7 +312,7 @@ proc getSockName*(socket: TSocketHandle): TPort =
if getsockname(socket, cast[ptr TSockAddr](addr(name)),
addr(namelen)) == -1'i32:
osError(osLastError())
result = TPort(sockets2.ntohs(name.sin_port))
result = TPort(rawsockets.ntohs(name.sin_port))
proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {.
tags: [FReadIO].} =