mirror of
https://github.com/nim-lang/Nim.git
synced 2026-02-26 04:45:08 +00:00
Implemented selector support for asyncio2.
This commit is contained in:
@@ -7,6 +7,8 @@
|
||||
# distribution, for details about the copyright.
|
||||
#
|
||||
|
||||
from posix import TSocketHandle
|
||||
|
||||
const
|
||||
EPOLLIN* = 0x00000001
|
||||
EPOLLPRI* = 0x00000002
|
||||
@@ -33,8 +35,8 @@ const
|
||||
type
|
||||
epoll_data* {.importc: "union epoll_data",
|
||||
header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union.
|
||||
thePtr* {.importc: "ptr".}: pointer # \
|
||||
#fd*: cint
|
||||
#thePtr* {.importc: "ptr".}: pointer
|
||||
fd*: cint # \
|
||||
#u32*: uint32
|
||||
#u64*: uint64
|
||||
|
||||
@@ -54,7 +56,7 @@ proc epoll_create1*(flags: cint): cint {.importc: "epoll_create1",
|
||||
## Same as epoll_create but with an FLAGS parameter. The unused SIZE
|
||||
## parameter has been dropped.
|
||||
|
||||
proc epoll_ctl*(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint {.
|
||||
proc epoll_ctl*(epfd: cint; op: cint; fd: cint | TSocketHandle; event: ptr epoll_event): cint {.
|
||||
importc: "epoll_ctl", header: "<sys/epoll.h>".}
|
||||
## Manipulate an epoll instance "epfd". Returns 0 in case of success,
|
||||
## -1 in case of error ( the "errno" variable will contain the
|
||||
|
||||
@@ -2356,7 +2356,7 @@ proc FD_ZERO*(a1: var TFdSet) {.importc, header: "<sys/select.h>".}
|
||||
|
||||
proc pselect*(a1: cint, a2, a3, a4: ptr TFdSet, a5: ptr Ttimespec,
|
||||
a6: var Tsigset): cint {.importc, header: "<sys/select.h>".}
|
||||
proc select*(a1: cint, a2, a3, a4: ptr TFdSet, a5: ptr Ttimeval): cint {.
|
||||
proc select*(a1: cint | TSocketHandle, a2, a3, a4: ptr TFdSet, a5: ptr Ttimeval): cint {.
|
||||
importc, header: "<sys/select.h>".}
|
||||
|
||||
when hasSpawnH:
|
||||
|
||||
@@ -9,8 +9,6 @@
|
||||
|
||||
import os, oids, tables, strutils, macros
|
||||
|
||||
import winlean
|
||||
|
||||
import sockets2, net
|
||||
|
||||
## Asyncio2
|
||||
@@ -93,7 +91,10 @@ proc failed*[T](future: PFuture[T]): bool =
|
||||
## Determines whether ``future`` completed with an error.
|
||||
future.error != nil
|
||||
|
||||
when defined(windows):
|
||||
# TODO: Get rid of register. Do it implicitly.
|
||||
|
||||
when defined(windows) or defined(nimdoc):
|
||||
import winlean
|
||||
type
|
||||
TCompletionKey = dword
|
||||
|
||||
@@ -293,7 +294,10 @@ when defined(windows):
|
||||
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
|
||||
flags: int = 0): PFuture[string] =
|
||||
## Reads ``size`` bytes from ``socket``. Returned future will complete once
|
||||
## all of the requested data is read.
|
||||
## all of the requested data is read. If socket is disconnected during the
|
||||
## recv operation then the future may complete with only a part of the
|
||||
## requested data read. If socket is disconnected and no data is available
|
||||
## to be read then the future will complete with a value of ``""``.
|
||||
|
||||
var retFuture = newFuture[string]()
|
||||
|
||||
@@ -448,24 +452,206 @@ when defined(windows):
|
||||
|
||||
return retFuture
|
||||
|
||||
proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] =
|
||||
## Accepts a new connection. Returns a future containing the client socket
|
||||
## corresponding to that connection.
|
||||
## The future will complete when the connection is successfully accepted.
|
||||
var retFut = newFuture[TSocketHandle]()
|
||||
var fut = p.acceptAddr(socket)
|
||||
fut.callback =
|
||||
proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
|
||||
assert future.finished
|
||||
if future.failed:
|
||||
retFut.fail(future.error)
|
||||
else:
|
||||
retFut.complete(future.read.client)
|
||||
return retFut
|
||||
|
||||
initAll()
|
||||
else:
|
||||
# TODO: Selectors.
|
||||
import selectors
|
||||
from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
|
||||
type
|
||||
TCallback = proc (sock: TSocketHandle): bool {.closure.}
|
||||
|
||||
PData* = ref object of PObject
|
||||
sock: TSocketHandle
|
||||
readCBs: seq[TCallback]
|
||||
writeCBs: seq[TCallback]
|
||||
|
||||
PDispatcher* = ref object
|
||||
selector: PSelector
|
||||
|
||||
proc newDispatcher*(): PDispatcher =
|
||||
new result
|
||||
result.selector = newSelector()
|
||||
|
||||
proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
|
||||
assert sock in p.selector
|
||||
echo("Update: ", events)
|
||||
if events == {}:
|
||||
discard p.selector.unregister(sock)
|
||||
else:
|
||||
discard p.selector.update(sock, events)
|
||||
|
||||
proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
|
||||
if sock notin p.selector:
|
||||
var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[])
|
||||
p.selector.register(sock, {EvRead}, data.PObject)
|
||||
else:
|
||||
p.selector[sock].data.PData.readCBs.add(cb)
|
||||
p.update(sock, p.selector[sock].events + {EvRead})
|
||||
|
||||
proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
|
||||
if sock notin p.selector:
|
||||
var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb])
|
||||
p.selector.register(sock, {EvWrite}, data.PObject)
|
||||
else:
|
||||
p.selector[sock].data.PData.writeCBs.add(cb)
|
||||
p.update(sock, p.selector[sock].events + {EvWrite})
|
||||
|
||||
proc poll*(p: PDispatcher, timeout = 500) =
|
||||
for info in p.selector.select(timeout):
|
||||
let data = PData(info.key.data)
|
||||
assert data.sock == info.key.fd
|
||||
echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events)
|
||||
|
||||
if EvRead in info.events:
|
||||
var newReadCBs: seq[TCallback] = @[]
|
||||
for cb in data.readCBs:
|
||||
if not cb(data.sock):
|
||||
# Callback wants to be called again.
|
||||
newReadCBs.add(cb)
|
||||
data.readCBs = newReadCBs
|
||||
|
||||
if EvWrite in info.events:
|
||||
var newWriteCBs: seq[TCallback] = @[]
|
||||
for cb in data.writeCBs:
|
||||
if not cb(data.sock):
|
||||
# Callback wants to be called again.
|
||||
newWriteCBs.add(cb)
|
||||
data.writeCBs = newWriteCBs
|
||||
|
||||
var newEvents: set[TEvent]
|
||||
if data.readCBs.len != 0: newEvents = {EvRead}
|
||||
if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
|
||||
p.update(data.sock, newEvents)
|
||||
|
||||
proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
|
||||
af = AF_INET): PFuture[int] =
|
||||
var retFuture = newFuture[int]()
|
||||
|
||||
proc cb(sock: TSocketHandle): bool =
|
||||
# We have connected.
|
||||
retFuture.complete(0)
|
||||
return true
|
||||
|
||||
var aiList = getAddrInfo(address, port, af)
|
||||
var success = false
|
||||
var lastError: TOSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen)
|
||||
if ret == 0:
|
||||
# Request to connect completed immediately.
|
||||
success = true
|
||||
retFuture.complete(0)
|
||||
break
|
||||
else:
|
||||
lastError = osLastError()
|
||||
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
|
||||
success = true
|
||||
addWrite(p, socket, cb)
|
||||
break
|
||||
else:
|
||||
success = false
|
||||
it = it.ai_next
|
||||
|
||||
dealloc(aiList)
|
||||
if not success:
|
||||
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
|
||||
return retFuture
|
||||
|
||||
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
|
||||
flags: int = 0): PFuture[string] =
|
||||
var retFuture = newFuture[string]()
|
||||
|
||||
var readBuffer = newString(size)
|
||||
var sizeRead = 0
|
||||
|
||||
proc cb(sock: TSocketHandle): bool =
|
||||
result = true
|
||||
let netSize = size - sizeRead
|
||||
let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint)
|
||||
if res < 0:
|
||||
let lastError = osLastError()
|
||||
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
|
||||
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
|
||||
else:
|
||||
result = false # We still want this callback to be called.
|
||||
elif res == 0:
|
||||
# Disconnected
|
||||
if sizeRead == 0:
|
||||
retFuture.complete("")
|
||||
else:
|
||||
readBuffer.setLen(sizeRead)
|
||||
retFuture.complete(readBuffer)
|
||||
else:
|
||||
sizeRead.inc(res)
|
||||
if res != netSize:
|
||||
result = false # We want to read all the data requested.
|
||||
else:
|
||||
retFuture.complete(readBuffer)
|
||||
|
||||
addRead(p, socket, cb)
|
||||
return retFuture
|
||||
|
||||
proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] =
|
||||
var retFuture = newFuture[int]()
|
||||
|
||||
var written = 0
|
||||
|
||||
proc cb(sock: TSocketHandle): bool =
|
||||
result = true
|
||||
let netSize = data.len-written
|
||||
var d = data.cstring
|
||||
let res = send(sock, addr d[written], netSize, 0.cint)
|
||||
if res < 0:
|
||||
let lastError = osLastError()
|
||||
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
|
||||
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
|
||||
else:
|
||||
result = false # We still want this callback to be called.
|
||||
else:
|
||||
written.inc(res)
|
||||
if res != netSize:
|
||||
result = false # We still have data to send.
|
||||
else:
|
||||
retFuture.complete(0)
|
||||
addWrite(p, socket, cb)
|
||||
return retFuture
|
||||
|
||||
|
||||
proc acceptAddr*(p: PDispatcher, socket: TSocketHandle):
|
||||
PFuture[tuple[address: string, client: TSocketHandle]] =
|
||||
var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
|
||||
proc cb(sock: TSocketHandle): bool =
|
||||
result = true
|
||||
var sockAddress: Tsockaddr_in
|
||||
var addrLen = sizeof(sockAddress).TSocklen
|
||||
var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)),
|
||||
addr(addrLen))
|
||||
if client == osInvalidSocket:
|
||||
let lastError = osLastError()
|
||||
assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
|
||||
if lastError.int32 == EINTR:
|
||||
return false
|
||||
else:
|
||||
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
|
||||
else:
|
||||
retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client))
|
||||
addRead(p, socket, cb)
|
||||
return retFuture
|
||||
|
||||
proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] =
|
||||
## Accepts a new connection. Returns a future containing the client socket
|
||||
## corresponding to that connection.
|
||||
## The future will complete when the connection is successfully accepted.
|
||||
var retFut = newFuture[TSocketHandle]()
|
||||
var fut = p.acceptAddr(socket)
|
||||
fut.callback =
|
||||
proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
|
||||
assert future.finished
|
||||
if future.failed:
|
||||
retFut.fail(future.error)
|
||||
else:
|
||||
retFut.complete(future.read.client)
|
||||
return retFut
|
||||
|
||||
# -- Await Macro
|
||||
|
||||
@@ -665,8 +851,7 @@ when isMainModule:
|
||||
|
||||
var p = newDispatcher()
|
||||
var sock = socket()
|
||||
#sock.setBlocking false
|
||||
p.register(sock)
|
||||
sock.setBlocking false
|
||||
|
||||
|
||||
when false:
|
||||
@@ -706,7 +891,7 @@ when isMainModule:
|
||||
var recvF = p.recv(sock, 10)
|
||||
recvF.callback =
|
||||
proc (future: PFuture[string]) =
|
||||
echo("Read: ", future.read)
|
||||
echo("Read ", future.read.len, ": ", future.read.repr)
|
||||
|
||||
else:
|
||||
|
||||
|
||||
@@ -37,4 +37,19 @@ proc bindAddr*(socket: TSocket, port = TPort(0), address = "") {.
|
||||
if bindAddr(socket, aiList.ai_addr, aiList.ai_addrlen.TSocklen) < 0'i32:
|
||||
dealloc(aiList)
|
||||
osError(osLastError())
|
||||
dealloc(aiList)
|
||||
dealloc(aiList)
|
||||
|
||||
proc setBlocking*(s: TSocket, blocking: bool) {.tags: [].} =
|
||||
## Sets blocking mode on socket
|
||||
when defined(Windows):
|
||||
var mode = clong(ord(not blocking)) # 1 for non-blocking, 0 for blocking
|
||||
if ioctlsocket(s, FIONBIO, addr(mode)) == -1:
|
||||
osError(osLastError())
|
||||
else: # BSD sockets
|
||||
var x: int = fcntl(s, F_GETFL, 0)
|
||||
if x == -1:
|
||||
osError(osLastError())
|
||||
else:
|
||||
var mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK
|
||||
if fcntl(s, F_SETFL, mode) == -1:
|
||||
osError(osLastError())
|
||||
@@ -1,7 +1,7 @@
|
||||
#
|
||||
#
|
||||
# Nimrod's Runtime Library
|
||||
# (c) Copyright 2013 Dominik Picheta
|
||||
# (c) Copyright 2014 Dominik Picheta
|
||||
#
|
||||
# See the file "copying.txt", included in this
|
||||
# distribution, for details about the copyright.
|
||||
@@ -9,212 +9,211 @@
|
||||
|
||||
# TODO: Docs.
|
||||
|
||||
import tables, os, unsigned
|
||||
when defined(windows):
|
||||
import winlean
|
||||
else:
|
||||
import posix
|
||||
import tables, os, unsigned, hashes
|
||||
|
||||
when defined(linux): import posix, epoll
|
||||
elif defined(windows): import winlean
|
||||
|
||||
proc hash*(x: TSocketHandle): THash {.borrow.}
|
||||
|
||||
type
|
||||
TEvent* = enum
|
||||
EvRead, EvWrite
|
||||
|
||||
TSelectorKey* = object
|
||||
fd: cint
|
||||
events: set[TEvent]
|
||||
data: PObject
|
||||
PSelectorKey* = ref object
|
||||
fd*: TSocketHandle
|
||||
events*: set[TEvent] ## The events which ``fd`` listens for.
|
||||
data*: PObject ## User object.
|
||||
|
||||
TReadyInfo* = tuple[key: TSelectorKey, events: set[TEvent]]
|
||||
TReadyInfo* = tuple[key: PSelectorKey, events: set[TEvent]]
|
||||
|
||||
PSelector* = ref object of PObject ## Selector interface.
|
||||
fds*: TTable[cint, TSelectorKey]
|
||||
registerImpl*: proc (s: PSelector, fd: cint, events: set[TEvent],
|
||||
data: PObject): TSelectorKey {.nimcall, tags: [FWriteIO].}
|
||||
unregisterImpl*: proc (s: PSelector, fd: cint): TSelectorKey {.nimcall, tags: [FWriteIO].}
|
||||
selectImpl*: proc (s: PSelector, timeout: int): seq[TReadyInfo] {.nimcall, tags: [FReadIO].}
|
||||
closeImpl*: proc (s: PSelector) {.nimcall.}
|
||||
|
||||
template initSelector(r: expr) =
|
||||
new r
|
||||
r.fds = initTable[cint, TSelectorKey]()
|
||||
|
||||
proc register*(s: PSelector, fd: cint, events: set[TEvent], data: PObject):
|
||||
TSelectorKey =
|
||||
if not s.registerImpl.isNil: result = s.registerImpl(s, fd, events, data)
|
||||
|
||||
proc unregister*(s: PSelector, fd: cint): TSelectorKey =
|
||||
##
|
||||
## **Note:** For the ``epoll`` implementation the resulting ``TSelectorKey``
|
||||
## will only have the ``fd`` field set. This is an optimisation and may
|
||||
## change in the future if a viable use case is presented.
|
||||
if not s.unregisterImpl.isNil: result = s.unregisterImpl(s, fd)
|
||||
|
||||
proc select*(s: PSelector, timeout = 500): seq[TReadyInfo] =
|
||||
##
|
||||
## The ``events`` field of the returned ``key`` contains the original events
|
||||
## for which the ``fd`` was bound. This is contrary to the ``events`` field
|
||||
## of the ``TReadyInfo`` tuple which determines which events are ready
|
||||
## on the ``fd``.
|
||||
|
||||
if not s.selectImpl.isNil: result = s.selectImpl(s, timeout)
|
||||
|
||||
proc close*(s: PSelector) =
|
||||
if not s.closeImpl.isNil: s.closeImpl(s)
|
||||
|
||||
# ---- Select() ----------------------------------------------------------------
|
||||
|
||||
type
|
||||
PSelectSelector* = ref object of PSelector ## Implementation of select()
|
||||
|
||||
proc ssRegister(s: PSelector, fd: cint, events: set[TEvent],
|
||||
data: PObject): TSelectorKey =
|
||||
if s.fds.hasKey(fd):
|
||||
raise newException(EInvalidValue, "FD already exists in selector.")
|
||||
var sk = TSelectorKey(fd: fd, events: events, data: data)
|
||||
s.fds[fd] = sk
|
||||
result = sk
|
||||
|
||||
proc ssUnregister(s: PSelector, fd: cint): TSelectorKey =
|
||||
result = s.fds[fd]
|
||||
s.fds.del(fd)
|
||||
|
||||
proc ssClose(s: PSelector) = nil
|
||||
|
||||
proc timeValFromMilliseconds(timeout: int): TTimeVal =
|
||||
if timeout != -1:
|
||||
var seconds = timeout div 1000
|
||||
result.tv_sec = seconds.int32
|
||||
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
|
||||
|
||||
proc createFdSet(rd, wr: var TFdSet, fds: TTable[cint, TSelectorKey],
|
||||
m: var int) =
|
||||
FD_ZERO(rd); FD_ZERO(wr)
|
||||
for k, v in pairs(fds):
|
||||
if EvRead in v.events:
|
||||
m = max(m, int(k))
|
||||
FD_SET(k, rd)
|
||||
if EvWrite in v.events:
|
||||
m = max(m, int(k))
|
||||
FD_SET(k, wr)
|
||||
|
||||
proc getReadyFDs(rd, wr: var TFdSet, fds: TTable[cint, TSelectorKey]):
|
||||
seq[TReadyInfo] =
|
||||
result = @[]
|
||||
for k, v in pairs(fds):
|
||||
var events: set[TEvent] = {}
|
||||
if FD_ISSET(k, rd) != 0'i32:
|
||||
events = events + {EvRead}
|
||||
if FD_ISSET(k, wr) != 0'i32:
|
||||
events = events + {EvWrite}
|
||||
result.add((v, events))
|
||||
|
||||
proc select(fds: TTable[cint, TSelectorKey], timeout = 500):
|
||||
seq[TReadyInfo] =
|
||||
var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout)
|
||||
|
||||
var rd, wr: TFdSet
|
||||
var m = 0
|
||||
createFdSet(rd, wr, fds, m)
|
||||
|
||||
var retCode = 0
|
||||
if timeout != -1:
|
||||
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, addr(tv)))
|
||||
else:
|
||||
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, nil))
|
||||
|
||||
if retCode < 0:
|
||||
OSError(OSLastError())
|
||||
elif retCode == 0:
|
||||
return @[]
|
||||
else:
|
||||
return getReadyFDs(rd, wr, fds)
|
||||
|
||||
proc ssSelect(s: PSelector, timeout: int): seq[TReadyInfo] =
|
||||
result = select(s.fds, timeout)
|
||||
|
||||
proc newSelectSelector*(): PSelectSelector =
|
||||
initSelector(result)
|
||||
result.registerImpl = ssRegister
|
||||
result.unregisterImpl = ssUnregister
|
||||
result.selectImpl = ssSelect
|
||||
result.closeImpl = ssClose
|
||||
|
||||
# ---- Epoll -------------------------------------------------------------------
|
||||
|
||||
when defined(linux):
|
||||
import epoll
|
||||
when defined(linux) or defined(nimdoc):
|
||||
type
|
||||
PEpollSelector* = ref object of PSelector
|
||||
PSelector* = ref object
|
||||
epollFD: cint
|
||||
events: array[64, ptr epoll_event]
|
||||
fds: TTable[TSocketHandle, PSelectorKey]
|
||||
|
||||
TDataWrapper = object
|
||||
fd: cint
|
||||
boundEvents: set[TEvent] ## The events which ``fd`` listens for.
|
||||
data: PObject ## User object.
|
||||
|
||||
proc esRegister(s: PSelector, fd: cint, events: set[TEvent],
|
||||
data: PObject): TSelectorKey =
|
||||
var es = PEpollSelector(s)
|
||||
var event: epoll_event
|
||||
proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event =
|
||||
if EvRead in events:
|
||||
event.events = EPOLLIN
|
||||
result.events = EPOLLIN
|
||||
if EvWrite in events:
|
||||
event.events = event.events or EPOLLOUT
|
||||
|
||||
var dw = cast[ptr TDataWrapper](alloc0(sizeof(TDataWrapper))) # TODO: This needs to be dealloc'd
|
||||
dw.fd = fd
|
||||
dw.boundEvents = events
|
||||
dw.data = data
|
||||
event.data.thePtr = dw
|
||||
|
||||
if epoll_ctl(es.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
|
||||
OSError(OSLastError())
|
||||
|
||||
result = TSelectorKey(fd: fd, events: events, data: data)
|
||||
result.events = result.events or EPOLLOUT
|
||||
result.data.fd = fd.cint
|
||||
|
||||
proc esUnregister(s: PSelector, fd: cint): TSelectorKey =
|
||||
# We cannot find out the information about this ``fd`` from the epoll
|
||||
# context. As such I will simply return an almost empty TSelectorKey.
|
||||
var es = PEpollSelector(s)
|
||||
if epoll_ctl(es.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
|
||||
proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent],
|
||||
data: PObject): PSelectorKey {.discardable.} =
|
||||
## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent
|
||||
## ``events``.
|
||||
if s.fds.hasKey(fd):
|
||||
raise newException(EInvalidValue, "File descriptor already exists.")
|
||||
|
||||
var event = createEventStruct(events, fd)
|
||||
|
||||
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
|
||||
OSError(OSLastError())
|
||||
# We could fill in the ``fds`` TTable to get the info, but that wouldn't
|
||||
# be nice for our memory.
|
||||
result = TSelectorKey(fd: fd, events: {}, data: nil)
|
||||
|
||||
var key = PSelectorKey(fd: fd, events: events, data: data)
|
||||
|
||||
s.fds[fd] = key
|
||||
result = key
|
||||
|
||||
proc update*(s: PSelector, fd: TSocketHandle,
|
||||
events: set[TEvent]): PSelectorKey {.discardable.} =
|
||||
## Updates the events which ``fd`` wants notifications for.
|
||||
if not s.fds.hasKey(fd):
|
||||
raise newException(EInvalidValue, "File descriptor not found.")
|
||||
var event = createEventStruct(events, fd)
|
||||
|
||||
s.fds[fd].events = events
|
||||
echo("About to update")
|
||||
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
|
||||
OSError(OSLastError())
|
||||
echo("finished updating")
|
||||
result = s.fds[fd]
|
||||
|
||||
proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
|
||||
if not s.fds.hasKey(fd):
|
||||
raise newException(EInvalidValue, "File descriptor not found.")
|
||||
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
|
||||
OSError(OSLastError())
|
||||
result = s.fds[fd]
|
||||
s.fds.del(fd)
|
||||
|
||||
proc esClose(s: PSelector) =
|
||||
var es = PEpollSelector(s)
|
||||
if es.epollFD.close() != 0: OSError(OSLastError())
|
||||
dealloc(addr es.events) # TODO: Test this
|
||||
proc close*(s: PSelector) =
|
||||
if s.epollFD.close() != 0: OSError(OSLastError())
|
||||
dealloc(addr s.events) # TODO: Test this
|
||||
|
||||
proc esSelect(s: PSelector, timeout: int): seq[TReadyInfo] =
|
||||
proc select*(s: PSelector, timeout: int): seq[TReadyInfo] =
|
||||
##
|
||||
## The ``events`` field of the returned ``key`` contains the original events
|
||||
## for which the ``fd`` was bound. This is contrary to the ``events`` field
|
||||
## of the ``TReadyInfo`` tuple which determines which events are ready
|
||||
## on the ``fd``.
|
||||
result = @[]
|
||||
var es = PEpollSelector(s)
|
||||
|
||||
let evNum = epoll_wait(es.epollFD, es.events[0], 64.cint, timeout.cint)
|
||||
let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint)
|
||||
if evNum < 0: OSError(OSLastError())
|
||||
if evNum == 0: return @[]
|
||||
for i in 0 .. <evNum:
|
||||
var evSet: set[TEvent] = {}
|
||||
if (es.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
|
||||
if (es.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
|
||||
let dw = cast[ptr TDataWrapper](es.events[i].data.thePtr)
|
||||
if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
|
||||
if (s.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
|
||||
|
||||
let selectorKey = TSelectorKey(fd: dw.fd, events: dw.boundEvents,
|
||||
data: dw.data)
|
||||
let selectorKey = s.fds[s.events[i].data.fd.TSocketHandle]
|
||||
result.add((selectorKey, evSet))
|
||||
|
||||
proc newEpollSelector*(): PEpollSelector =
|
||||
proc newSelector*(): PSelector =
|
||||
new result
|
||||
result.epollFD = epoll_create(64)
|
||||
result.events = cast[array[64, ptr epoll_event]](alloc0(sizeof(epoll_event)*64))
|
||||
result.fds = initTable[TSocketHandle, PSelectorKey]()
|
||||
if result.epollFD < 0:
|
||||
OSError(OSLastError())
|
||||
result.registerImpl = esRegister
|
||||
result.unregisterImpl = esUnregister
|
||||
result.closeImpl = esClose
|
||||
result.selectImpl = esSelect
|
||||
|
||||
proc contains*(s: PSelector, fd: TSocketHandle): bool =
|
||||
## Determines whether selector contains a file descriptor.
|
||||
return s.fds.hasKey(fd)
|
||||
|
||||
proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey =
|
||||
## Retrieves the selector key for ``fd``.
|
||||
return s.fds[fd]
|
||||
|
||||
elif defined(windows):
|
||||
type
|
||||
PSelector* = ref object
|
||||
fds: TTable[TSocketHandle, PSelectorKey]
|
||||
|
||||
proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent],
|
||||
data: PObject): PSelectorKey {.discardable.} =
|
||||
if s.fds.hasKey(fd):
|
||||
raise newException(EInvalidValue, "File descriptor already exists.")
|
||||
var sk = PSelectorKey(fd: fd, events: events, data: data)
|
||||
s.fds[fd] = sk
|
||||
result = sk
|
||||
|
||||
proc update*(s: PSelector, fd: TSocketHandle,
|
||||
events: set[TEvent]): PSelectorKey {.discardable.} =
|
||||
## Updates the events which ``fd`` wants notifications for.
|
||||
if not s.fds.hasKey(fd):
|
||||
raise newException(EInvalidValue, "File descriptor not found.")
|
||||
|
||||
s.fds[fd].events = events
|
||||
result = s.fds[fd]
|
||||
|
||||
proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
|
||||
result = s.fds[fd]
|
||||
s.fds.del(fd)
|
||||
|
||||
proc close*(s: PSelector) = nil
|
||||
|
||||
proc timeValFromMilliseconds(timeout: int): TTimeVal =
|
||||
if timeout != -1:
|
||||
var seconds = timeout div 1000
|
||||
result.tv_sec = seconds.int32
|
||||
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
|
||||
|
||||
proc createFdSet(rd, wr: var TFdSet, fds: TTable[TSocketHandle, PSelectorKey],
|
||||
m: var int) =
|
||||
FD_ZERO(rd); FD_ZERO(wr)
|
||||
for k, v in pairs(fds):
|
||||
if EvRead in v.events:
|
||||
m = max(m, int(k))
|
||||
FD_SET(k, rd)
|
||||
if EvWrite in v.events:
|
||||
m = max(m, int(k))
|
||||
FD_SET(k, wr)
|
||||
|
||||
proc getReadyFDs(rd, wr: var TFdSet, fds: TTable[TSocketHandle, PSelectorKey]):
|
||||
seq[TReadyInfo] =
|
||||
result = @[]
|
||||
for k, v in pairs(fds):
|
||||
var events: set[TEvent] = {}
|
||||
if FD_ISSET(k, rd) != 0'i32:
|
||||
events = events + {EvRead}
|
||||
if FD_ISSET(k, wr) != 0'i32:
|
||||
events = events + {EvWrite}
|
||||
result.add((v, events))
|
||||
|
||||
proc select(fds: TTable[TSocketHandle, PSelectorKey], timeout = 500):
|
||||
seq[TReadyInfo] =
|
||||
var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout)
|
||||
|
||||
var rd, wr: TFdSet
|
||||
var m = 0
|
||||
createFdSet(rd, wr, fds, m)
|
||||
|
||||
var retCode = 0
|
||||
if timeout != -1:
|
||||
retCode = int(select(TSocketHandle(m+1), addr(rd), addr(wr), nil, addr(tv)))
|
||||
else:
|
||||
retCode = int(select(TSocketHandle(m+1), addr(rd), addr(wr), nil, nil))
|
||||
|
||||
if retCode < 0:
|
||||
OSError(OSLastError())
|
||||
elif retCode == 0:
|
||||
return @[]
|
||||
else:
|
||||
return getReadyFDs(rd, wr, fds)
|
||||
|
||||
proc select*(s: PSelector, timeout: int): seq[TReadyInfo] =
|
||||
result = select(s.fds, timeout)
|
||||
|
||||
proc newSelector*(): PSelector =
|
||||
new result
|
||||
result.fds = initTable[TSocketHandle, PSelectorKey]()
|
||||
|
||||
proc contains*(s: PSelector, fd: TSocketHandle): bool =
|
||||
return s.fds.hasKey(fd)
|
||||
|
||||
proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey =
|
||||
return s.fds[fd]
|
||||
|
||||
elif defined(bsd) or defined(macosx):
|
||||
# TODO: kqueue
|
||||
{.error: "Sorry your platform is not supported yet.".}
|
||||
else:
|
||||
{.error: "Sorry your platform is not supported.".}
|
||||
|
||||
when isMainModule:
|
||||
# Select()
|
||||
@@ -224,11 +223,12 @@ when isMainModule:
|
||||
sock: TSocket
|
||||
|
||||
var sock = socket()
|
||||
sock.setBlocking(false)
|
||||
sock.connect("irc.freenode.net", TPort(6667))
|
||||
|
||||
var selector = newEpollSelector()
|
||||
var selector = newSelector()
|
||||
var data = PSockWrapper(sock: sock)
|
||||
let key = selector.register(sock.getFD.cint, {EvRead}, data)
|
||||
let key = selector.register(sock.getFD, {EvWrite}, data)
|
||||
var i = 0
|
||||
while true:
|
||||
let ready = selector.select(1000)
|
||||
@@ -236,6 +236,7 @@ when isMainModule:
|
||||
if ready.len > 0: echo ready[0].events
|
||||
i.inc
|
||||
if i == 6:
|
||||
assert selector.unregister(sock.getFD).fd == sock.getFD
|
||||
selector.close()
|
||||
break
|
||||
|
||||
|
||||
@@ -17,11 +17,13 @@ when hostos == "solaris":
|
||||
|
||||
when defined(Windows):
|
||||
import winlean
|
||||
export ioctlsocket
|
||||
else:
|
||||
import posix
|
||||
export fcntl, F_GETFL, O_NONBLOCK, F_SETFL
|
||||
|
||||
export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen,
|
||||
inet_ntoa
|
||||
inet_ntoa, recv, `==`, connect, send, accept
|
||||
|
||||
type
|
||||
|
||||
@@ -63,10 +65,10 @@ type
|
||||
|
||||
when defined(windows):
|
||||
let
|
||||
OSInvalidSocket* = winlean.INVALID_SOCKET
|
||||
osInvalidSocket* = winlean.INVALID_SOCKET
|
||||
else:
|
||||
let
|
||||
OSInvalidSocket* = posix.INVALID_SOCKET
|
||||
osInvalidSocket* = posix.INVALID_SOCKET
|
||||
|
||||
proc `==`*(a, b: TPort): bool {.borrow.}
|
||||
## ``==`` for ports.
|
||||
|
||||
@@ -21,7 +21,7 @@ proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn
|
||||
proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
|
||||
for i in 0 .. <swarmSize:
|
||||
var sock = socket()
|
||||
disp.register(sock)
|
||||
#disp.register(sock)
|
||||
discard await disp.connect(sock, "localhost", port)
|
||||
when true:
|
||||
discard await sendMessages(disp, sock)
|
||||
@@ -48,7 +48,7 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn
|
||||
|
||||
proc createServer(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
|
||||
var server = socket()
|
||||
disp.register(server)
|
||||
#disp.register(server)
|
||||
server.bindAddr(port)
|
||||
server.listen()
|
||||
while true:
|
||||
|
||||
Reference in New Issue
Block a user