Merge branch 'newasync' of https://github.com/Araq/Nimrod into devel

This commit is contained in:
Araq
2014-02-09 09:37:24 +01:00
6 changed files with 1141 additions and 0 deletions

88
lib/posix/epoll.nim Normal file
View File

@@ -0,0 +1,88 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2013 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
const
EPOLLIN* = 0x00000001
EPOLLPRI* = 0x00000002
EPOLLOUT* = 0x00000004
EPOLLERR* = 0x00000008
EPOLLHUP* = 0x00000010
EPOLLRDNORM* = 0x00000040
EPOLLRDBAND* = 0x00000080
EPOLLWRNORM* = 0x00000100
EPOLLWRBAND* = 0x00000200
EPOLLMSG* = 0x00000400
EPOLLRDHUP* = 0x00002000
EPOLLWAKEUP* = 1 shl 29
EPOLLONESHOT* = 1 shl 30
EPOLLET* = 1 shl 31
# Valid opcodes ( "op" parameter ) to issue to epoll_ctl().
const
EPOLL_CTL_ADD* = 1 # Add a file descriptor to the interface.
EPOLL_CTL_DEL* = 2 # Remove a file descriptor from the interface.
EPOLL_CTL_MOD* = 3 # Change file descriptor epoll_event structure.
type
epoll_data* {.importc: "union epoll_data",
header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union.
thePtr* {.importc: "ptr".}: pointer # \
#fd*: cint
#u32*: uint32
#u64*: uint64
epoll_event* {.importc: "struct epoll_event", header: "<sys/epoll.h>", pure, final.} = object
events*: uint32 # Epoll events
data*: epoll_data # User data variable
proc epoll_create*(size: cint): cint {.importc: "epoll_create",
header: "<sys/epoll.h>".}
## Creates an epoll instance. Returns an fd for the new instance.
## The "size" parameter is a hint specifying the number of file
## descriptors to be associated with the new instance. The fd
## returned by epoll_create() should be closed with close().
proc epoll_create1*(flags: cint): cint {.importc: "epoll_create1",
header: "<sys/epoll.h>".}
## Same as epoll_create but with an FLAGS parameter. The unused SIZE
## parameter has been dropped.
proc epoll_ctl*(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint {.
importc: "epoll_ctl", header: "<sys/epoll.h>".}
## Manipulate an epoll instance "epfd". Returns 0 in case of success,
## -1 in case of error ( the "errno" variable will contain the
## specific error code ) The "op" parameter is one of the EPOLL_CTL_*
## constants defined above. The "fd" parameter is the target of the
## operation. The "event" parameter describes which events the caller
## is interested in and any associated user data.
proc epoll_wait*(epfd: cint; events: ptr epoll_event; maxevents: cint;
timeout: cint): cint {.importc: "epoll_wait",
header: "<sys/epoll.h>".}
## Wait for events on an epoll instance "epfd". Returns the number of
## triggered events returned in "events" buffer. Or -1 in case of
## error with the "errno" variable set to the specific error code. The
## "events" parameter is a buffer that will contain triggered
## events. The "maxevents" is the maximum number of events to be
## returned ( usually size of "events" ). The "timeout" parameter
## specifies the maximum wait time in milliseconds (-1 == infinite).
##
## This function is a cancellation point and therefore not marked with
## __THROW.
#proc epoll_pwait*(epfd: cint; events: ptr epoll_event; maxevents: cint;
# timeout: cint; ss: ptr sigset_t): cint {.
# importc: "epoll_pwait", header: "<sys/epoll.h>".}
# Same as epoll_wait, but the thread's signal mask is temporarily
# and atomically replaced with the one provided as parameter.
#
# This function is a cancellation point and therefore not marked with
# __THROW.

485
lib/pure/asyncio2.nim Normal file
View File

@@ -0,0 +1,485 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2014 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
import os, oids, tables, strutils
import winlean
import sockets2, net
## Asyncio2
## --------
##
## This module implements a brand new asyncio module based on Futures.
## IOCP is used under the hood on Windows and the selectors module is used for
## other operating systems.
# -- Futures
type
PFutureVoid* = ref object of PObject
cbVoid: proc () {.closure.}
finished: bool
PFuture*[T] = ref object of PFutureVoid
value: T
error: ref EBase
cb: proc (future: PFuture[T]) {.closure.}
proc newFuture*[T](): PFuture[T] =
## Creates a new future.
new(result)
result.finished = false
proc complete*[T](future: PFuture[T], val: T) =
## Completes ``future`` with value ``val``.
assert(not future.finished)
assert(future.error == nil)
future.value = val
future.finished = true
if future.cb != nil:
future.cb(future)
if future.cbVoid != nil:
future.cbVoid()
proc fail*[T](future: PFuture[T], error: ref EBase) =
## Completes ``future`` with ``error``.
assert(not future.finished)
future.finished = true
future.error = error
if future.cb != nil:
future.cb(future)
proc `callback=`*[T](future: PFuture[T],
cb: proc (future: PFuture[T]) {.closure.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.cb = cb
if future.finished:
future.cb(future)
proc `callbackVoid=`*(future: PFutureVoid, cb: proc () {.closure.}) =
## Sets the **void** callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
##
## **Note**: This is used for the ``await`` functionality, you most likely
## want to use ``callback``.
future.cbVoid = cb
if future.finished:
future.cbVoid()
proc read*[T](future: PFuture[T]): T =
## Retrieves the value of ``future``. Future must be finished otherwise
## this function will fail with a ``EInvalidValue`` exception.
##
## If the result of the future is an error then that error will be raised.
if future.finished:
if future.error != nil: raise future.error
return future.value
else:
# TODO: Make a custom exception type for this?
raise newException(EInvalidValue, "Future still in progress.")
proc finished*[T](future: PFuture[T]): bool =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``hasError`` to distinguish.
future.finished
proc failed*[T](future: PFuture[T]): bool =
## Determines whether ``future`` completed with an error.
future.error != nil
when defined(windows):
type
TCompletionKey = dword
TCompletionData* = object
sock: TSocketHandle
cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) {.closure.}
PDispatcher* = ref object
ioPort: THandle
TCustomOverlapped = object
Internal*: DWORD
InternalHigh*: DWORD
Offset*: DWORD
OffsetHigh*: DWORD
hEvent*: THANDLE
data*: TCompletionData
PCustomOverlapped = ptr TCustomOverlapped
proc newDispatcher*(): PDispatcher =
## Creates a new Dispatcher instance.
new result
result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
proc register*(p: PDispatcher, sock: TSocketHandle) =
## Registers ``sock`` with the dispatcher ``p``.
if CreateIOCompletionPort(sock.THandle, p.ioPort,
cast[TCompletionKey](sock), 1) == 0:
OSError(OSLastError())
proc poll*(p: PDispatcher, timeout = 500) =
## Waits for completion events and processes them.
let llTimeout =
if timeout == -1: winlean.INFINITE
else: timeout.int32
var lpNumberOfBytesTransferred: DWORD
var lpCompletionKey: ULONG
var lpOverlapped: POverlapped
let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred,
addr lpCompletionKey, addr lpOverlapped, llTimeout).bool
# http://stackoverflow.com/a/12277264/492186
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
if res:
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
customOverlapped.data.cb(customOverlapped.data.sock, TOSErrorCode(-1))
dealloc(customOverlapped)
else:
let errCode = OSLastError()
if lpOverlapped != nil:
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
dealloc(customOverlapped)
customOverlapped.data.cb(customOverlapped.data.sock, errCode)
else:
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
discard
else: OSError(errCode)
var connectExPtr: pointer = nil
var acceptExPtr: pointer = nil
var getAcceptExSockAddrsPtr: pointer = nil
proc initPointer(s: TSocketHandle, func: var pointer, guid: var TGUID): bool =
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
var bytesRet: DWord
func = nil
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD,
addr bytesRet, nil, nil) == 0
proc initAll() =
let dummySock = socket()
if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
OSError(OSLastError())
if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
OSError(OSLastError())
if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS):
OSError(OSLastError())
proc connectEx(s: TSocketHandle, name: ptr TSockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: dword,
lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool =
if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().")
let func =
cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint,
lpSendBuffer: pointer, dwSendDataLength: dword,
lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr)
result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
lpOverlapped)
proc acceptEx(listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD,
lpOverlapped: POverlapped): bool =
if acceptExPtr.isNil: raise newException(EInvalidValue, "Need to initialise AcceptEx().")
let func =
cast[proc (listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD,
lpOverlapped: POverlapped): bool {.stdcall.}](acceptExPtr)
result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
lpOverlapped)
proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD,
LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint,
RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) =
if getAcceptExSockAddrsPtr.isNil:
raise newException(EInvalidValue, "Need to initialise getAcceptExSockAddrs().")
let func =
cast[proc (lpOutputBuffer: pointer,
dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr,
LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr,
RemoteSockaddrLength: lpint) {.stdcall.}](getAcceptExSockAddrsPtr)
func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
RemoteSockaddr, RemoteSockaddrLength)
proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
af = AF_INET): PFuture[int] =
## Connects ``socket`` to server at ``address:port``.
##
## Returns a ``PFuture`` which will complete when the connection succeeds
## or an error occurs.
var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed.
# Apparently ``ConnectEx`` expects the socket to be initially bound:
var saddr: Tsockaddr_in
saddr.sin_family = int16(toInt(af))
saddr.sin_port = 0
saddr.sin_addr.s_addr = INADDR_ANY
if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)),
sizeof(saddr).TSockLen) < 0'i32:
OSError(OSLastError())
var aiList = getAddrInfo(address, port, af)
var success = false
var lastError: TOSErrorCode
var it = aiList
while it != nil:
# "the OVERLAPPED structure must remain valid until the I/O completes"
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
nil, 0, nil, cast[POverlapped](ol))
if ret:
# Request to connect completed immediately.
success = true
retFuture.complete(0)
dealloc(ol)
break
else:
lastError = OSLastError()
if lastError.int32 == ERROR_IO_PENDING:
# In this case ``ol`` will be deallocated in ``poll``.
success = true
break
else:
dealloc(ol)
success = false
it = it.ai_next
dealloc(aiList)
if not success:
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
return retFuture
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int): PFuture[string] =
## Reads ``size`` bytes from ``socket``. Returned future will complete once
## all of the requested data is read.
var retFuture = newFuture[string]()
var dataBuf: TWSABuf
dataBuf.buf = newString(size)
dataBuf.len = size
var bytesReceived, flags: DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
addr flags, cast[POverlapped](ol), nil)
if ret == -1:
let err = OSLastError()
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
else:
# Request to read completed immediately.
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
dealloc(ol)
return retFuture
proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
var retFuture = newFuture[int]()
var dataBuf: TWSABuf
dataBuf.buf = data
dataBuf.len = data.len
var bytesReceived, flags: DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived,
flags, cast[POverlapped](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
else:
retFuture.complete(0)
dealloc(ol)
return retFuture
proc acceptAddr*(p: PDispatcher, socket: TSocketHandle):
PFuture[tuple[address: string, client: TSocketHandle]] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
var clientSock = socket()
if clientSock == OSInvalidSocket: osError(osLastError())
const lpOutputLen = 1024
var lpOutputBuf = newString(lpOutputLen)
var dwBytesReceived: DWORD
let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
let dwLocalAddressLength = DWORD(sizeof (TSockaddr_in) + 16)
let dwRemoteAddressLength = DWORD(sizeof(TSockaddr_in) + 16)
template completeAccept(): stmt {.immediate, dirty.} =
var listenSock = socket
let setoptRet = setsockopt(clientSock, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
sizeof(listenSock).TSockLen)
if setoptRet != 0: osError(osLastError())
var LocalSockaddr, RemoteSockaddr: ptr TSockAddr
var localLen, remoteLen: int32
getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
dwLocalAddressLength, dwRemoteAddressLength,
addr LocalSockaddr, addr localLen,
addr RemoteSockaddr, addr remoteLen)
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
retFuture.complete(
(address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr),
client: clientSock)
)
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
completeAccept()
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0],
dwReceiveDataLength,
dwLocalAddressLength,
dwRemoteAddressLength,
addr dwBytesReceived, cast[POverlapped](ol))
if not ret:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
else:
completeAccept()
dealloc(ol)
return retFuture
proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection.
## The future will complete when the connection is successfully accepted.
var retFut = newFuture[TSocketHandle]()
var fut = p.acceptAddr(socket)
fut.callback =
proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
assert future.finished
if future.failed:
retFut.fail(future.error)
else:
retFut.complete(future.read.client)
return retFut
initAll()
else:
# TODO: Selectors.
when isMainModule:
var p = newDispatcher()
var sock = socket()
#sock.setBlocking false
p.register(sock)
when true:
var f = p.connect(sock, "irc.freenode.org", TPort(6667))
f.callback =
proc (future: PFuture[int]) =
echo("Connected in future!")
echo(future.read)
for i in 0 .. 50:
var recvF = p.recv(sock, 10)
recvF.callback =
proc (future: PFuture[string]) =
echo("Read: ", future.read)
else:
sock.bindAddr(TPort(6667))
sock.listen()
proc onAccept(future: PFuture[TSocketHandle]) =
echo "Accepted"
var t = p.send(future.read, "test\c\L")
t.callback =
proc (future: PFuture[int]) =
echo(future.read)
var f = p.accept(sock)
f.callback = onAccept
var f = p.accept(sock)
f.callback = onAccept
while true:
p.poll()
echo "polled"

40
lib/pure/net.nim Normal file
View File

@@ -0,0 +1,40 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2014 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements a high-level cross-platform sockets interface.
import sockets2, os
type
TSocket* = TSocketHandle
proc bindAddr*(socket: TSocket, port = TPort(0), address = "") {.
tags: [FReadIO].} =
## binds an address/port number to a socket.
## Use address string in dotted decimal form like "a.b.c.d"
## or leave "" for any address.
if address == "":
var name: TSockaddr_in
when defined(windows):
name.sin_family = toInt(AF_INET).int16
else:
name.sin_family = toInt(AF_INET)
name.sin_port = htons(int16(port))
name.sin_addr.s_addr = htonl(INADDR_ANY)
if bindAddr(socket, cast[ptr TSockAddr](addr(name)),
sizeof(name).TSocklen) < 0'i32:
osError(osLastError())
else:
var aiList = getAddrInfo(address, port, AF_INET)
if bindAddr(socket, aiList.ai_addr, aiList.ai_addrlen.TSocklen) < 0'i32:
dealloc(aiList)
osError(osLastError())
dealloc(aiList)

249
lib/pure/selectors.nim Normal file
View File

@@ -0,0 +1,249 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2013 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
# TODO: Docs.
import tables, os, unsigned
when defined(windows):
import winlean
else:
import posix
type
TEvent* = enum
EvRead, EvWrite
TSelectorKey* = object
fd: cint
events: set[TEvent]
data: PObject
TReadyInfo* = tuple[key: TSelectorKey, events: set[TEvent]]
PSelector* = ref object of PObject ## Selector interface.
fds*: TTable[cint, TSelectorKey]
registerImpl*: proc (s: PSelector, fd: cint, events: set[TEvent],
data: PObject): TSelectorKey {.nimcall, tags: [FWriteIO].}
unregisterImpl*: proc (s: PSelector, fd: cint): TSelectorKey {.nimcall, tags: [FWriteIO].}
selectImpl*: proc (s: PSelector, timeout: int): seq[TReadyInfo] {.nimcall, tags: [FReadIO].}
closeImpl*: proc (s: PSelector) {.nimcall.}
template initSelector(r: expr) =
new r
r.fds = initTable[cint, TSelectorKey]()
proc register*(s: PSelector, fd: cint, events: set[TEvent], data: PObject):
TSelectorKey =
if not s.registerImpl.isNil: result = s.registerImpl(s, fd, events, data)
proc unregister*(s: PSelector, fd: cint): TSelectorKey =
##
## **Note:** For the ``epoll`` implementation the resulting ``TSelectorKey``
## will only have the ``fd`` field set. This is an optimisation and may
## change in the future if a viable use case is presented.
if not s.unregisterImpl.isNil: result = s.unregisterImpl(s, fd)
proc select*(s: PSelector, timeout = 500): seq[TReadyInfo] =
##
## The ``events`` field of the returned ``key`` contains the original events
## for which the ``fd`` was bound. This is contrary to the ``events`` field
## of the ``TReadyInfo`` tuple which determines which events are ready
## on the ``fd``.
if not s.selectImpl.isNil: result = s.selectImpl(s, timeout)
proc close*(s: PSelector) =
if not s.closeImpl.isNil: s.closeImpl(s)
# ---- Select() ----------------------------------------------------------------
type
PSelectSelector* = ref object of PSelector ## Implementation of select()
proc ssRegister(s: PSelector, fd: cint, events: set[TEvent],
data: PObject): TSelectorKey =
if s.fds.hasKey(fd):
raise newException(EInvalidValue, "FD already exists in selector.")
var sk = TSelectorKey(fd: fd, events: events, data: data)
s.fds[fd] = sk
result = sk
proc ssUnregister(s: PSelector, fd: cint): TSelectorKey =
result = s.fds[fd]
s.fds.del(fd)
proc ssClose(s: PSelector) = nil
proc timeValFromMilliseconds(timeout: int): TTimeVal =
if timeout != -1:
var seconds = timeout div 1000
result.tv_sec = seconds.int32
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
proc createFdSet(rd, wr: var TFdSet, fds: TTable[cint, TSelectorKey],
m: var int) =
FD_ZERO(rd); FD_ZERO(wr)
for k, v in pairs(fds):
if EvRead in v.events:
m = max(m, int(k))
FD_SET(k, rd)
if EvWrite in v.events:
m = max(m, int(k))
FD_SET(k, wr)
proc getReadyFDs(rd, wr: var TFdSet, fds: TTable[cint, TSelectorKey]):
seq[TReadyInfo] =
result = @[]
for k, v in pairs(fds):
var events: set[TEvent] = {}
if FD_ISSET(k, rd) != 0'i32:
events = events + {EvRead}
if FD_ISSET(k, wr) != 0'i32:
events = events + {EvWrite}
result.add((v, events))
proc select(fds: TTable[cint, TSelectorKey], timeout = 500):
seq[TReadyInfo] =
var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout)
var rd, wr: TFdSet
var m = 0
createFdSet(rd, wr, fds, m)
var retCode = 0
if timeout != -1:
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, addr(tv)))
else:
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, nil))
if retCode < 0:
OSError(OSLastError())
elif retCode == 0:
return @[]
else:
return getReadyFDs(rd, wr, fds)
proc ssSelect(s: PSelector, timeout: int): seq[TReadyInfo] =
result = select(s.fds, timeout)
proc newSelectSelector*(): PSelectSelector =
initSelector(result)
result.registerImpl = ssRegister
result.unregisterImpl = ssUnregister
result.selectImpl = ssSelect
result.closeImpl = ssClose
# ---- Epoll -------------------------------------------------------------------
when defined(linux):
import epoll
type
PEpollSelector* = ref object of PSelector
epollFD: cint
events: array[64, ptr epoll_event]
TDataWrapper = object
fd: cint
boundEvents: set[TEvent] ## The events which ``fd`` listens for.
data: PObject ## User object.
proc esRegister(s: PSelector, fd: cint, events: set[TEvent],
data: PObject): TSelectorKey =
var es = PEpollSelector(s)
var event: epoll_event
if EvRead in events:
event.events = EPOLLIN
if EvWrite in events:
event.events = event.events or EPOLLOUT
var dw = cast[ptr TDataWrapper](alloc0(sizeof(TDataWrapper))) # TODO: This needs to be dealloc'd
dw.fd = fd
dw.boundEvents = events
dw.data = data
event.data.thePtr = dw
if epoll_ctl(es.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
OSError(OSLastError())
result = TSelectorKey(fd: fd, events: events, data: data)
proc esUnregister(s: PSelector, fd: cint): TSelectorKey =
# We cannot find out the information about this ``fd`` from the epoll
# context. As such I will simply return an almost empty TSelectorKey.
var es = PEpollSelector(s)
if epoll_ctl(es.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
OSError(OSLastError())
# We could fill in the ``fds`` TTable to get the info, but that wouldn't
# be nice for our memory.
result = TSelectorKey(fd: fd, events: {}, data: nil)
proc esClose(s: PSelector) =
var es = PEpollSelector(s)
if es.epollFD.close() != 0: OSError(OSLastError())
dealloc(addr es.events) # TODO: Test this
proc esSelect(s: PSelector, timeout: int): seq[TReadyInfo] =
result = @[]
var es = PEpollSelector(s)
let evNum = epoll_wait(es.epollFD, es.events[0], 64.cint, timeout.cint)
if evNum < 0: OSError(OSLastError())
if evNum == 0: return @[]
for i in 0 .. <evNum:
var evSet: set[TEvent] = {}
if (es.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
if (es.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
let dw = cast[ptr TDataWrapper](es.events[i].data.thePtr)
let selectorKey = TSelectorKey(fd: dw.fd, events: dw.boundEvents,
data: dw.data)
result.add((selectorKey, evSet))
proc newEpollSelector*(): PEpollSelector =
new result
result.epollFD = epoll_create(64)
result.events = cast[array[64, ptr epoll_event]](alloc0(sizeof(epoll_event)*64))
if result.epollFD < 0:
OSError(OSLastError())
result.registerImpl = esRegister
result.unregisterImpl = esUnregister
result.closeImpl = esClose
result.selectImpl = esSelect
when isMainModule:
# Select()
import sockets
type
PSockWrapper = ref object of PObject
sock: TSocket
var sock = socket()
sock.connect("irc.freenode.net", TPort(6667))
var selector = newEpollSelector()
var data = PSockWrapper(sock: sock)
let key = selector.register(sock.getFD.cint, {EvRead}, data)
var i = 0
while true:
let ready = selector.select(1000)
echo ready.len
if ready.len > 0: echo ready[0].events
i.inc
if i == 6:
selector.close()
break

202
lib/pure/sockets2.nim Normal file
View File

@@ -0,0 +1,202 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2014 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements a low-level cross-platform sockets interface. Look
## at the ``net`` module for the higher-level version.
import unsigned, os
when hostos == "solaris":
{.passl: "-lsocket -lnsl".}
when defined(Windows):
import winlean
else:
import posix
export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen,
inet_ntoa
type
TPort* = distinct uint16 ## port type
TDomain* = enum ## domain, which specifies the protocol family of the
## created socket. Other domains than those that are listed
## here are unsupported.
AF_UNIX, ## for local socket (using a file). Unsupported on Windows.
AF_INET = 2, ## for network protocol IPv4 or
AF_INET6 = 23 ## for network protocol IPv6.
TType* = enum ## second argument to `socket` proc
SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets
SOCK_DGRAM = 2, ## datagram service or Datagram Sockets
SOCK_RAW = 3, ## raw protocols atop the network layer.
SOCK_SEQPACKET = 5 ## reliable sequenced packet service
TProtocol* = enum ## third argument to `socket` proc
IPPROTO_TCP = 6, ## Transmission control protocol.
IPPROTO_UDP = 17, ## User datagram protocol.
IPPROTO_IP, ## Internet protocol. Unsupported on Windows.
IPPROTO_IPV6, ## Internet Protocol Version 6. Unsupported on Windows.
IPPROTO_RAW, ## Raw IP Packets Protocol. Unsupported on Windows.
IPPROTO_ICMP ## Control message protocol. Unsupported on Windows.
TServent* {.pure, final.} = object ## information about a service
name*: string
aliases*: seq[string]
port*: TPort
proto*: string
Thostent* {.pure, final.} = object ## information about a given host
name*: string
aliases*: seq[string]
addrtype*: TDomain
length*: int
addrList*: seq[string]
when defined(windows):
let
OSInvalidSocket* = winlean.INVALID_SOCKET
else:
let
OSInvalidSocket* = posix.INVALID_SOCKET
proc `==`*(a, b: TPort): bool {.borrow.}
## ``==`` for ports.
proc `$`*(p: TPort): string {.borrow.}
## returns the port number as a string
proc toInt*(domain: TDomain): cint
## Converts the TDomain enum to a platform-dependent ``cint``.
proc toInt*(typ: TType): cint
## Converts the TType enum to a platform-dependent ``cint``.
proc toInt*(p: TProtocol): cint
## Converts the TProtocol enum to a platform-dependent ``cint``.
when defined(posix):
proc toInt(domain: TDomain): cint =
case domain
of AF_UNIX: result = posix.AF_UNIX
of AF_INET: result = posix.AF_INET
of AF_INET6: result = posix.AF_INET6
else: nil
proc toInt(typ: TType): cint =
case typ
of SOCK_STREAM: result = posix.SOCK_STREAM
of SOCK_DGRAM: result = posix.SOCK_DGRAM
of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET
of SOCK_RAW: result = posix.SOCK_RAW
else: nil
proc toInt(p: TProtocol): cint =
case p
of IPPROTO_TCP: result = posix.IPPROTO_TCP
of IPPROTO_UDP: result = posix.IPPROTO_UDP
of IPPROTO_IP: result = posix.IPPROTO_IP
of IPPROTO_IPV6: result = posix.IPPROTO_IPV6
of IPPROTO_RAW: result = posix.IPPROTO_RAW
of IPPROTO_ICMP: result = posix.IPPROTO_ICMP
else: nil
else:
proc toInt(domain: TDomain): cint =
result = toU16(ord(domain))
proc toInt(typ: TType): cint =
result = cint(ord(typ))
proc toInt(p: TProtocol): cint =
result = cint(ord(p))
proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
## Creates a new socket; returns `InvalidSocket` if an error occurs.
# TODO: The function which will use this will raise EOS.
socket(toInt(domain), toInt(typ), toInt(protocol))
proc close*(socket: TSocketHandle) =
## closes a socket.
when defined(windows):
discard winlean.closeSocket(socket)
else:
discard posix.close(socket)
# TODO: These values should not be discarded. An EOS should be raised.
# http://stackoverflow.com/questions/12463473/what-happens-if-you-call-close-on-a-bsd-socket-multiple-times
proc bindAddr*(socket: TSocketHandle, name: ptr TSockAddr, namelen: TSockLen): cint =
result = bindSocket(socket, name, namelen)
proc listen*(socket: TSocketHandle, backlog = SOMAXCONN) {.tags: [FReadIO].} =
## Marks ``socket`` as accepting connections.
## ``Backlog`` specifies the maximum length of the
## queue of pending connections.
when defined(windows):
if winlean.listen(socket, cint(backlog)) < 0'i32: osError(osLastError())
else:
if posix.listen(socket, cint(backlog)) < 0'i32: osError(osLastError())
proc getAddrInfo*(address: string, port: TPort, af: TDomain = AF_INET, typ: TType = SOCK_STREAM,
prot: TProtocol = IPPROTO_TCP): ptr TAddrInfo =
##
##
## **Warning**: The resulting ``ptr TAddrInfo`` must be freed using ``dealloc``!
var hints: TAddrInfo
result = nil
hints.ai_family = toInt(af)
hints.ai_socktype = toInt(typ)
hints.ai_protocol = toInt(prot)
var gaiResult = getAddrInfo(address, $port, addr(hints), result)
if gaiResult != 0'i32:
when defined(windows):
OSError(OSLastError())
else:
raise newException(EOS, $gai_strerror(gaiResult))
proc dealloc*(ai: ptr TAddrInfo) =
freeaddrinfo(ai)
proc ntohl*(x: int32): int32 =
## Converts 32-bit integers from network to host byte order.
## On machines where the host byte order is the same as network byte order,
## this is a no-op; otherwise, it performs a 4-byte swap operation.
when cpuEndian == bigEndian: result = x
else: result = (x shr 24'i32) or
(x shr 8'i32 and 0xff00'i32) or
(x shl 8'i32 and 0xff0000'i32) or
(x shl 24'i32)
proc ntohs*(x: int16): int16 =
## Converts 16-bit integers from network to host byte order. On machines
## where the host byte order is the same as network byte order, this is
## a no-op; otherwise, it performs a 2-byte swap operation.
when cpuEndian == bigEndian: result = x
else: result = (x shr 8'i16) or (x shl 8'i16)
proc htonl*(x: int32): int32 =
## Converts 32-bit integers from host to network byte order. On machines
## where the host byte order is the same as network byte order, this is
## a no-op; otherwise, it performs a 4-byte swap operation.
result = sockets2.ntohl(x)
proc htons*(x: int16): int16 =
## Converts 16-bit positive integers from host to network byte order.
## On machines where the host byte order is the same as network byte
## order, this is a no-op; otherwise, it performs a 2-byte swap operation.
result = sockets2.ntohs(x)
when defined(Windows):
var wsa: TWSADATA
if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError())

View File

@@ -16,8 +16,12 @@ const
type
THandle* = int
LONG* = int32
ULONG* = int
PULONG* = ptr int
WINBOOL* = int32
DWORD* = int32
PDWORD* = ptr DWORD
LPINT* = ptr int32
HDC* = THandle
HGLRC* = THandle
@@ -632,3 +636,76 @@ when not useWinUnicode:
proc unmapViewOfFile*(lpBaseAddress: pointer): WINBOOL {.stdcall,
dynlib: "kernel32", importc: "UnmapViewOfFile".}
type
TOVERLAPPED* {.final, pure.} = object
Internal*: DWORD
InternalHigh*: DWORD
Offset*: DWORD
OffsetHigh*: DWORD
hEvent*: THANDLE
POVERLAPPED* = ptr TOVERLAPPED
POVERLAPPED_COMPLETION_ROUTINE* = proc (para1: DWORD, para2: DWORD,
para3: POVERLAPPED){.stdcall.}
TGUID* {.final, pure.} = object
D1*: int32
D2*: int16
D3*: int16
D4*: array [0..7, int8]
const
ERROR_IO_PENDING* = 997
proc CreateIoCompletionPort*(FileHandle: THANDLE, ExistingCompletionPort: THANDLE,
CompletionKey: DWORD,
NumberOfConcurrentThreads: DWORD): THANDLE{.stdcall,
dynlib: "kernel32", importc: "CreateIoCompletionPort".}
proc GetQueuedCompletionStatus*(CompletionPort: THandle,
lpNumberOfBytesTransferred: PDWORD, lpCompletionKey: PULONG,
lpOverlapped: ptr POverlapped,
dwMilliseconds: DWORD): WINBOOL{.stdcall,
dynlib: "kernel32", importc: "GetQueuedCompletionStatus".}
const
IOC_OUT* = 0x40000000
IOC_IN* = 0x80000000
IOC_WS2* = 0x08000000
IOC_INOUT* = IOC_IN or IOC_OUT
template WSAIORW*(x,y): expr = (IOC_INOUT or x or y)
const
SIO_GET_EXTENSION_FUNCTION_POINTER* = WSAIORW(IOC_WS2,6).DWORD
SO_UPDATE_ACCEPT_CONTEXT* = 0x700B
var
WSAID_CONNECTEX*: TGUID = TGUID(D1: 0x25a207b9, D2: 0xddf3'i16, D3: 0x4660, D4: [
0x8e'i8, 0xe9'i8, 0x76'i8, 0xe5'i8, 0x8c'i8, 0x74'i8, 0x06'i8, 0x3e'i8])
WSAID_ACCEPTEX*: TGUID = TGUID(D1: 0xb5367df1'i32, D2: 0xcbac'i16, D3: 0x11cf, D4: [
0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8, 0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8])
WSAID_GETACCEPTEXSOCKADDRS*: TGUID = TGUID(D1: 0xb5367df2'i32, D2: 0xcbac'i16, D3: 0x11cf, D4: [
0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8, 0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8])
proc WSAIoctl*(s: TSocketHandle, dwIoControlCode: dword, lpvInBuffer: pointer,
cbInBuffer: dword, lpvOutBuffer: pointer, cbOutBuffer: dword,
lpcbBytesReturned: PDword, lpOverlapped: POVERLAPPED,
lpCompletionRoutine: POVERLAPPED_COMPLETION_ROUTINE): cint
{.stdcall, importc: "WSAIoctl", dynlib: "Ws2_32.dll".}
type
TWSABuf* {.importc: "WSABUF", header: "winsock2.h".} = object
len*: ULONG
buf*: cstring
proc WSARecv*(s: TSocketHandle, buf: ptr TWSABuf, bufCount: DWORD,
bytesReceived, flags: PDWORD, lpOverlapped: POverlapped,
completionProc: POVERLAPPED_COMPLETION_ROUTINE): cint {.
stdcall, importc: "WSARecv", dynlib: "Ws2_32.dll".}
proc WSASend*(s: TSocketHandle, buf: ptr TWSABuf, bufCount: DWORD,
bytesSent: PDWord, flags: DWORD, lpOverlapped: POverlapped,
completionProc: POVERLAPPED_COMPLETION_ROUTINE): cint {.
stdcall, importc: "WSASend", dynlib: "Ws2_32.dll".}