diff --git a/lib/posix/epoll.nim b/lib/posix/epoll.nim index 86b9775766..c5ed1a8733 100644 --- a/lib/posix/epoll.nim +++ b/lib/posix/epoll.nim @@ -35,18 +35,13 @@ const EPOLL_CTL_MOD* = 3 # Change file descriptor epoll_event structure. type - epoll_data* {.importc: "union epoll_data", + EpollData* {.importc: "union epoll_data", header: "", pure, final.} = object # TODO: This is actually a union. - #thePtr* {.importc: "ptr".}: pointer - fd* {.importc: "fd".}: cint - when defined(linux) and defined(amd64): - u32: uint32 # this field ensures that binary size is right - it cannot be - # used because its offset is wrong - #u64*: uint64 + u64* {.importc: "u64".}: uint64 - epoll_event* {.importc: "struct epoll_event", header: "", pure, final.} = object + EpollEvent* {.importc: "struct epoll_event", header: "", pure, final.} = object events*: uint32 # Epoll events - data*: epoll_data # User data variable + data*: EpollData # User data variable proc epoll_create*(size: cint): cint {.importc: "epoll_create", header: "".} @@ -60,7 +55,7 @@ proc epoll_create1*(flags: cint): cint {.importc: "epoll_create1", ## Same as epoll_create but with an FLAGS parameter. The unused SIZE ## parameter has been dropped. -proc epoll_ctl*(epfd: cint; op: cint; fd: cint | SocketHandle; event: ptr epoll_event): cint {. +proc epoll_ctl*(epfd: cint; op: cint; fd: cint | SocketHandle; event: ptr EpollEvent): cint {. importc: "epoll_ctl", header: "".} ## Manipulate an epoll instance "epfd". Returns 0 in case of success, ## -1 in case of error ( the "errno" variable will contain the @@ -69,7 +64,7 @@ proc epoll_ctl*(epfd: cint; op: cint; fd: cint | SocketHandle; event: ptr epoll_ ## operation. The "event" parameter describes which events the caller ## is interested in and any associated user data. -proc epoll_wait*(epfd: cint; events: ptr epoll_event; maxevents: cint; +proc epoll_wait*(epfd: cint; events: ptr EpollEvent; maxevents: cint; timeout: cint): cint {.importc: "epoll_wait", header: "".} ## Wait for events on an epoll instance "epfd". Returns the number of @@ -84,7 +79,7 @@ proc epoll_wait*(epfd: cint; events: ptr epoll_event; maxevents: cint; ## __THROW. -#proc epoll_pwait*(epfd: cint; events: ptr epoll_event; maxevents: cint; +#proc epoll_pwait*(epfd: cint; events: ptr EpollEvent; maxevents: cint; # timeout: cint; ss: ptr sigset_t): cint {. # importc: "epoll_pwait", header: "".} # Same as epoll_wait, but the thread's signal mask is temporarily diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 281d5b848b..4c96aa6148 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,8 +9,9 @@ include "system/inclrtl" -import os, tables, strutils, times, heapqueue, options, asyncstreams +import os, tables, strutils, times, heapqueue, lists, options, asyncstreams import asyncfutures except callSoon + import nativesockets, net, deques export Port, SocketFlag @@ -136,6 +137,7 @@ export asyncfutures, asyncstreams ## and occasionally the compilation may fail altogether. ## As such it is better to use the former style when possible. ## +## ## Discarding futures ## ------------------ ## @@ -226,6 +228,12 @@ when defined(windows) or defined(nimdoc): ovl: PCustomOverlapped PostCallbackDataPtr = ptr PostCallbackData + AsyncEventImpl = object + hEvent: Handle + hWaiter: Handle + pcd: PostCallbackDataPtr + AsyncEvent* = ptr AsyncEventImpl + Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD, TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].} @@ -332,9 +340,9 @@ when defined(windows) or defined(nimdoc): # Callback queue processing processPendingCallbacks(p) - var connectExPtr: pointer = nil - var acceptExPtr: pointer = nil - var getAcceptExSockAddrsPtr: pointer = nil + var acceptEx: WSAPROC_ACCEPTEX + var connectEx: WSAPROC_CONNECTEX + var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c @@ -346,56 +354,19 @@ when defined(windows) or defined(nimdoc): proc initAll() = let dummySock = newNativeSocket() - if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): + if dummySock == INVALID_SOCKET: raiseOSError(osLastError()) - if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): + var fun: pointer = nil + if not initPointer(dummySock, fun, WSAID_CONNECTEX): raiseOSError(osLastError()) - if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): + connectEx = cast[WSAPROC_CONNECTEX](fun) + if not initPointer(dummySock, fun, WSAID_ACCEPTEX): raiseOSError(osLastError()) - - proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: Dword, - lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool = - if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().") - let fun = - cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: Dword, - lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr) - - result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, - lpOverlapped) - - proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, - lpOverlapped: POVERLAPPED): bool = - if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().") - let fun = - cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, - lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr) - result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, - dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, - lpOverlapped) - - proc getAcceptExSockaddrs(lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword, - LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt, - RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) = - if getAcceptExSockAddrsPtr.isNil: - raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().") - - let fun = - cast[proc (lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr, - LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr, - RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) - - fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, - RemoteSockaddr, RemoteSockaddrLength) + acceptEx = cast[WSAPROC_ACCEPTEX](fun) + if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS): + raiseOSError(osLastError()) + getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun) + close(dummySock) proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = @@ -506,10 +477,7 @@ when defined(windows) or defined(nimdoc): proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete(0) - else: - retFuture.complete(bytesCount) + retFuture.complete(bytesCount) else: if flags.isDisconnectionError(errcode): retFuture.complete(0) @@ -543,10 +511,11 @@ when defined(windows) or defined(nimdoc): proc send*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all - ## data has been sent. - ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls - ## to avoid early freeing of the buffer + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future + ## will complete once all data has been sent. + ## + ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") @@ -793,7 +762,7 @@ when defined(windows) or defined(nimdoc): cast[pointer](p.ovl)) {.pop.} - template registerWaitableEvent(mask) = + proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) = let p = getGlobalDispatcher() var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword var hEvent = wsaCreateEvent() @@ -843,8 +812,8 @@ when defined(windows) or defined(nimdoc): cast[pointer](pcd), INFINITE, flags): # pcd.ovl will be unrefed in poll() let err = osLastError() - discard wsaCloseEvent(hEvent) deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) raiseOSError(err) else: # we incref `pcd.ovl` and `protect` callback one more time, @@ -883,16 +852,17 @@ when defined(windows) or defined(nimdoc): ## ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addRead` only if really - ## need it (main usecase is adaptation of `unix like` libraries to be + ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). - ## If you use this function, you dont need to use asyncdispatch.recv() + ## + ## If you use this function, you don't need to use asyncdispatch.recv() ## or asyncdispatch.accept(), because they are using IOCP, please use ## nativesockets.recv() and nativesockets.accept() instead. ## ## Be sure your callback ``cb`` returns ``true``, if you want to remove ## watch of `read` notifications, and ``false``, if you want to continue - ## receiving notifies. - registerWaitableEvent(FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) proc addWrite*(fd: AsyncFD, cb: Callback) = ## Start watching the file descriptor for write availability and then call @@ -900,43 +870,213 @@ when defined(windows) or defined(nimdoc): ## ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addWrite` only if really - ## need it (main usecase is adaptation of `unix like` libraries to be + ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). - ## If you use this function, you dont need to use asyncdispatch.send() + ## + ## If you use this function, you don't need to use asyncdispatch.send() ## or asyncdispatch.connect(), because they are using IOCP, please use ## nativesockets.send() and nativesockets.connect() instead. ## ## Be sure your callback ``cb`` returns ``true``, if you want to remove ## watch of `write` notifications, and ``false``, if you want to continue - ## receiving notifies. - registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE) + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) + + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, + handleCallback) = + let handleFD = AsyncFD(hEvent) + pcd.ioPort = p.ioPort + pcd.handleFd = handleFD + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data.fd = handleFD + ol.data.cb = handleCallback + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), timeout.Dword, flags): + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard closeHandle(hEvent) + raiseOSError(err) + p.handles.incl(handleFD) + + template closeWaitable(handle: untyped) = + let waitFd = pcd.waitFd + deallocShared(cast[pointer](pcd)) + p.handles.excl(fd) + if unregisterWait(waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + discard closeHandle(handle) + raiseOSError(err) + if closeHandle(handle) == 0: + raiseOSError(osLastError()) + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Registers callback ``cb`` to be called when timer expired. + ## + ## Parameters: + ## + ## * ``timeout`` - timeout value in milliseconds. + ## * ``oneshot`` + ## * `true` - generate only one timeout event + ## * `false` - generate timeout events periodically + + doAssert(timeout > 0) + let p = getGlobalDispatcher() + + var hEvent = createEvent(nil, 1, 0, nil) + if hEvent == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.Dword + if oneshot: flags = flags or WT_EXECUTEONLYONCE + + proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + let res = cb(fd) + if res or oneshot: + closeWaitable(hEvent) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + + registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) + + proc addProcess*(pid: int, cb: Callback) = + ## Registers callback ``cb`` to be called when process with process ID + ## ``pid`` exited. + let p = getGlobalDispatcher() + let procFlags = SYNCHRONIZE + var hProcess = openProcess(procFlags, 0, pid.Dword) + if hProcess == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.Dword + + proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + closeWaitable(hProcess) + discard cb(fd) + + registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates a new thread-safe ``AsyncEvent`` object. + ## + ## New ``AsyncEvent`` object is not automatically registered with # TODO: Why? -- DP + ## dispatcher like ``AsyncSocket``. + var sa = SECURITY_ATTRIBUTES( + nLength: sizeof(SECURITY_ATTRIBUTES).cint, + bInheritHandle: 1 + ) + var event = createEvent(addr(sa), 0'i32, 0'i32, nil) + if event == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl))) + result.hEvent = event + + proc trigger*(ev: AsyncEvent) = + ## Set event ``ev`` to signaled state. + if setEvent(ev.hEvent) == 0: + raiseOSError(osLastError()) + + proc unregister*(ev: AsyncEvent) = + ## Unregisters event ``ev``. + doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") + let p = getGlobalDispatcher() + p.handles.excl(AsyncFD(ev.hEvent)) + if unregisterWait(ev.hWaiter) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + ev.hWaiter = 0 + + proc close*(ev: AsyncEvent) = + ## Closes event ``ev``. + let res = closeHandle(ev.hEvent) + deallocShared(cast[pointer](ev)) + if res == 0: + raiseOSError(osLastError()) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Registers callback ``cb`` to be called when ``ev`` will be signaled + doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") + + let p = getGlobalDispatcher() + let hEvent = ev.hEvent + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.Dword + + proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if ev.hWaiter != 0: + if cb(fd): + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + deallocShared(cast[pointer](pcd)) + if ev.hWaiter != 0: + unregister(ev) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + else: + # if ev.hWaiter == 0, then event was unregistered before `poll()` call. + deallocShared(cast[pointer](pcd)) + + registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) + ev.hWaiter = pcd.waitFd initAll() else: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL - + const + InitCallbackListSize = 4 # initial size of callbacks sequence, + # associated with file/socket descriptor. + InitDelayedCallbackListSize = 64 # initial size of delayed callbacks + # queue. type AsyncFD* = distinct cint Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} - PData* = ref object of RootRef - fd: AsyncFD - readCBs: seq[Callback] - writeCBs: seq[Callback] + AsyncData = object + readList: seq[Callback] + writeList: seq[Callback] + + AsyncEvent* = distinct SelectEvent PDispatcher* = ref object of PDispatcherBase - selector: Selector + selector: Selector[AsyncData] {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].} proc `==`*(x, y: AsyncFD): bool {.borrow.} + proc `==`*(x, y: AsyncEvent): bool {.borrow.} + + template newAsyncData(): AsyncData = + AsyncData( + readList: newSeqOfCap[Callback](InitCallbackListSize), + writeList: newSeqOfCap[Callback](InitCallbackListSize) + ) proc newDispatcher*(): PDispatcher = new result - result.selector = newSelector() + result.selector = newSelector[AsyncData]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) + result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher @@ -951,15 +1091,10 @@ else: setGlobalDispatcher(newDispatcher()) result = gDisp - proc update(fd: AsyncFD, events: set[Event]) = - let p = getGlobalDispatcher() - assert fd.SocketHandle in p.selector - p.selector.update(fd.SocketHandle, events) - proc register*(fd: AsyncFD) = let p = getGlobalDispatcher() - var data = PData(fd: fd, readCBs: @[], writeCBs: @[]) - p.selector.register(fd.SocketHandle, {}, data.RootRef) + var data = newAsyncData() + p.selector.registerHandle(fd.SocketHandle, {}, data) proc closeSocket*(sock: AsyncFD) = let disp = getGlobalDispatcher() @@ -969,75 +1104,148 @@ else: proc unregister*(fd: AsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) + proc unregister*(ev: AsyncEvent) = + getGlobalDispatcher().selector.unregister(SelectEvent(ev)) + proc addRead*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() - if fd.SocketHandle notin p.selector: + var newEvents = {Event.Read} + withData(p.selector, fd.SocketHandle, adata) do: + adata.readList.add(cb) + newEvents.incl(Event.Read) + if len(adata.writeList) != 0: newEvents.incl(Event.Write) + do: raise newException(ValueError, "File descriptor not registered.") - p.selector[fd.SocketHandle].data.PData.readCBs.add(cb) - update(fd, p.selector[fd.SocketHandle].events + {EvRead}) + p.selector.updateHandle(fd.SocketHandle, newEvents) proc addWrite*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() - if fd.SocketHandle notin p.selector: + var newEvents = {Event.Write} + withData(p.selector, fd.SocketHandle, adata) do: + adata.writeList.add(cb) + newEvents.incl(Event.Write) + if len(adata.readList) != 0: newEvents.incl(Event.Read) + do: raise newException(ValueError, "File descriptor not registered.") - p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) - update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) - - template processCallbacks(callbacks: untyped) = - # Callback may add items to ``callbacks`` which causes issues if - # we are iterating over it at the same time. We therefore - # make a copy to iterate over. - let currentCBs = callbacks - callbacks = @[] - # Using another sequence because callbacks themselves can add - # other callbacks. - var newCBs: seq[Callback] = @[] - for cb in currentCBs: - if newCBs.len > 0: - # A callback has already returned with EAGAIN, don't call any - # others until next `poll`. - newCBs.add(cb) - else: - if not cb(data.fd): - # Callback wants to be called again. - newCBs.add(cb) - callbacks = newCBs & callbacks + p.selector.updateHandle(fd.SocketHandle, newEvents) proc hasPendingOperations*(): bool = let p = getGlobalDispatcher() - p.selector.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 + not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + + template processBasicCallbacks(ident, rwlist: untyped) = + # Process pending descriptor and AsyncEvent callbacks. + # + # Invoke every callback stored in `rwlist`, until one + # returns `false` (which means callback wants to stay + # alive). In such case all remaining callbacks will be added + # to `rwlist` again, in the order they have been inserted. + # + # `rwlist` associated with file descriptor MUST BE emptied before + # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128), + # or it can be possible to fall into endless cycle. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.rwlist) + adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = max(len(curList), InitCallbackListSize) + var newList = newSeqOfCap[Callback](newLength) + + for cb in curList: + if len(newList) > 0: + # A callback has already returned with EAGAIN, don't call any others + # until next `poll`. + newList.add(cb) + else: + if not cb(fd.AsyncFD): + # Callback wants to be called again. + newList.add(cb) + + withData(p.selector, ident, adata) do: + # descriptor still present in queue. + adata.rwlist = newList & adata.rwlist + rLength = len(adata.readList) + wLength = len(adata.writeList) + do: + # descriptor was unregistered in callback via `unregister()`. + rLength = -1 + wLength = -1 + + template processCustomCallbacks(ident: untyped) = + # Process pending custom event callbacks. Custom events are + # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. + # There can be only one callback registered with one descriptor, + # so there is no need to iterate over list. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.readList) + adata.readList = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = len(curList) + var newList = newSeqOfCap[Callback](newLength) + + var cb = curList[0] + if not cb(fd.AsyncFD): + newList.add(cb) + + withData(p.selector, ident, adata) do: + # descriptor still present in queue. + adata.readList = newList & adata.readList + if len(adata.readList) == 0: + # if no callbacks registered with descriptor, unregister it. + p.selector.unregister(fd) + do: + # descriptor was unregistered in callback via `unregister()`. + discard proc poll*(timeout = 500) = let p = getGlobalDispatcher() - if p.selector.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: + when ioselSupportedPlatform: + let customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} + + if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") - if p.selector.len > 0: - for info in p.selector.select(p.adjustedTimeout(timeout)): - let data = PData(info.key.data) - assert data.fd == info.key.fd.AsyncFD - #echo("In poll ", data.fd.cint) - # There may be EvError here, but we handle them in callbacks, - # so that exceptions can be raised from `send(...)` and - # `recv(...)` routines. + if not p.selector.isEmpty(): + var keys: array[64, ReadyKey] + var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) + for i in 0.. 0: incl(newEvents, Event.Read) + if wLength > 0: incl(newEvents, Event.Write) + p.selector.updateHandle(SocketHandle(fd), newEvents) # Timer processing. processTimers(p) @@ -1075,7 +1283,7 @@ else: return retFuture proc recvInto*(socket: AsyncFD, buf: pointer, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = + flags = {SocketFlag.SafeDisconn}): Future[int] = var retFuture = newFuture[int]("recvInto") proc cb(sock: AsyncFD): bool = @@ -1216,6 +1424,55 @@ else: addRead(socket, cb) return retFuture + when ioselSupportedPlatform: + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Start watching for timeout expiration, and then call the + ## callback ``cb``. + ## ``timeout`` - time in milliseconds, + ## ``oneshot`` - if ``true`` only one event will be dispatched, + ## if ``false`` continuous events every ``timeout`` milliseconds. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerTimer(timeout, oneshot, data) + + proc addSignal*(signal: int, cb: Callback) = + ## Start watching signal ``signal``, and when signal appears, call the + ## callback ``cb``. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerSignal(signal, data) + + proc addProcess*(pid: int, cb: Callback) = + ## Start watching for process exit with pid ``pid``, and then call + ## the callback ``cb``. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerProcess(pid, data) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates new ``AsyncEvent``. + result = AsyncEvent(newSelectEvent()) + + proc trigger*(ev: AsyncEvent) = + ## Sets new ``AsyncEvent`` to signaled state. + trigger(SelectEvent(ev)) + + proc close*(ev: AsyncEvent) = + ## Closes ``AsyncEvent`` + close(SelectEvent(ev)) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Start watching for event ``ev``, and call callback ``cb``, when + ## ev will be set to signaled state. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerEvent(SelectEvent(ev), data) + # Common procedures between current and upcoming asyncdispatch include includes.asynccommon @@ -1269,7 +1526,7 @@ proc send*(socket: AsyncFD, data: string, var copiedData = data GC_ref(copiedData) # we need to protect data until send operation is completed - # or failed. + # or failed. let sendFut = socket.send(addr copiedData[0], data.len, flags) sendFut.callback = @@ -1317,7 +1574,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = ## ## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead. - template addNLIfEmpty(): untyped = + template addNLIfEmpty(): typed = if result.len == 0: result.add("\c\L") @@ -1353,3 +1610,5 @@ proc waitFor*[T](fut: Future[T]): T = poll() fut.read + +{.deprecated: [setEvent: trigger].} \ No newline at end of file diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim deleted file mode 100644 index 1722da1c65..0000000000 --- a/lib/pure/ioselectors.nim +++ /dev/null @@ -1,294 +0,0 @@ -# -# -# Nim's Runtime Library -# (c) Copyright 2016 Eugene Kabanov -# -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -## This module allows high-level and efficient I/O multiplexing. -## -## Supported OS primitives: ``epoll``, ``kqueue``, ``poll`` and -## Windows ``select``. -## -## To use threadsafe version of this module, it needs to be compiled -## with both ``-d:threadsafe`` and ``--threads:on`` options. -## -## Supported features: files, sockets, pipes, timers, processes, signals -## and user events. -## -## Fully supported OS: MacOSX, FreeBSD, OpenBSD, NetBSD, Linux (except -## for Android). -## -## Partially supported OS: Windows (only sockets and user events), -## Solaris (files, sockets, handles and user events). -## Android (files, sockets, handles and user events). -## -## TODO: ``/dev/poll``, ``event ports`` and filesystem events. - -import os - -const hasThreadSupport = compileOption("threads") and defined(threadsafe) - -const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or - defined(netbsd) or defined(openbsd) or - defined(dragonfly) or - (defined(linux) and not defined(android)) - ## This constant is used to determine whether the destination platform is - ## fully supported by ``ioselectors`` module. - -const bsdPlatform = defined(macosx) or defined(freebsd) or - defined(netbsd) or defined(openbsd) or - defined(dragonfly) - -when defined(nimdoc): - type SocketHandle = int - type - Selector*[T] = ref object - ## An object which holds descriptors to be checked for read/write status - - Event* {.pure.} = enum - ## An enum which hold event types - Read, ## Descriptor is available for read - Write, ## Descriptor is available for write - Timer, ## Timer descriptor is completed - Signal, ## Signal is raised - Process, ## Process is finished - Vnode, ## BSD specific file change happens - User, ## User event is raised - Error, ## Error happens while waiting, for descriptor - VnodeWrite, ## NOTE_WRITE (BSD specific, write to file occurred) - VnodeDelete, ## NOTE_DELETE (BSD specific, unlink of file occurred) - VnodeExtend, ## NOTE_EXTEND (BSD specific, file extended) - VnodeAttrib, ## NOTE_ATTRIB (BSD specific, file attributes changed) - VnodeLink, ## NOTE_LINK (BSD specific, file link count changed) - VnodeRename, ## NOTE_RENAME (BSD specific, file renamed) - VnodeRevoke ## NOTE_REVOKE (BSD specific, file revoke occurred) - - ReadyKey* = object - ## An object which holds result for descriptor - fd* : int ## file/socket descriptor - events*: set[Event] ## set of events - - SelectEvent* = object - ## An object which holds user defined event - - proc newSelector*[T](): Selector[T] = - ## Creates a new selector - - proc close*[T](s: Selector[T]) = - ## Closes selector - - proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], - data: T) = - ## Registers file/socket descriptor ``fd`` to selector ``s`` - ## with events set in ``events``. The ``data`` is application-defined - ## data, which to be passed when event happens. - - proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = - ## Update file/socket descriptor ``fd``, registered in selector - ## ``s`` with new events set ``event``. - - proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, - data: T): int {.discardable.} = - ## Registers timer notification with ``timeout`` in milliseconds - ## to selector ``s``. - ## If ``oneshot`` is ``true`` timer will be notified only once. - ## Set ``oneshot`` to ``false`` if your want periodic notifications. - ## The ``data`` is application-defined data, which to be passed, when - ## time limit expired. - - proc registerSignal*[T](s: Selector[T], signal: int, - data: T): int {.discardable.} = - ## Registers Unix signal notification with ``signal`` to selector - ## ``s``. The ``data`` is application-defined data, which to be - ## passed, when signal raises. - ## - ## This function is not supported for ``Windows``. - - proc registerProcess*[T](s: Selector[T], pid: int, - data: T): int {.discardable.} = - ## Registers process id (pid) notification when process has - ## exited to selector ``s``. - ## The ``data`` is application-defined data, which to be passed, when - ## process with ``pid`` has exited. - - proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = - ## Registers selector event ``ev`` to selector ``s``. - ## ``data`` application-defined data, which to be passed, when - ## ``ev`` happens. - - proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], - data: T) = - ## Registers selector BSD/MacOSX specific vnode events for file - ## descriptor ``fd`` and events ``events``. - ## ``data`` application-defined data, which to be passed, when - ## vnode event happens. - ## - ## This function is supported only by BSD and MacOSX. - - proc newSelectEvent*(): SelectEvent = - ## Creates new event ``SelectEvent``. - - proc setEvent*(ev: SelectEvent) = - ## Trigger event ``ev``. - - proc close*(ev: SelectEvent) = - ## Closes selector event ``ev``. - - proc unregister*[T](s: Selector[T], ev: SelectEvent) = - ## Unregisters event ``ev`` from selector ``s``. - - proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = - ## Unregisters file/socket descriptor ``fd`` from selector ``s``. - - proc selectInto*[T](s: Selector[T], timeout: int, - results: var openarray[ReadyKey]): int = - ## Process call waiting for events registered in selector ``s``. - ## The ``timeout`` argument specifies the minimum number of milliseconds - ## the function will be blocked, if no events are not ready. Specifying a - ## timeout of ``-1`` causes function to block indefinitely. - ## All available events will be stored in ``results`` array. - ## - ## Function returns number of triggered events. - - proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = - ## Process call waiting for events registered in selector ``s``. - ## The ``timeout`` argument specifies the minimum number of milliseconds - ## the function will be blocked, if no events are not ready. Specifying a - ## timeout of -1 causes function to block indefinitely. - ## - ## Function returns sequence of triggered events. - - proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = - ## Retrieves application-defined ``data`` associated with descriptor ``fd``. - ## If specified descriptor ``fd`` is not registered, empty/default value - ## will be returned. - - proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: var T): bool = - ## Associate application-defined ``data`` with descriptor ``fd``. - ## - ## Returns ``true``, if data was succesfully updated, ``false`` otherwise. - - template isEmpty*[T](s: Selector[T]): bool = - ## Returns ``true``, if there no registered events or descriptors - ## in selector. - - template withData*[T](s: Selector[T], fd: SocketHandle|int, value, - body: untyped) = - ## Retrieves the application-data assigned with descriptor ``fd`` - ## to ``value``. This ``value`` can be modified in the scope of - ## the ``withData`` call. - ## - ## .. code-block:: nim - ## - ## s.withData(fd, value) do: - ## # block is executed only if ``fd`` registered in selector ``s`` - ## value.uid = 1000 - ## - - template withData*[T](s: Selector[T], fd: SocketHandle|int, value, - body1, body2: untyped) = - ## Retrieves the application-data assigned with descriptor ``fd`` - ## to ``value``. This ``value`` can be modified in the scope of - ## the ``withData`` call. - ## - ## .. code-block:: nim - ## - ## s.withData(fd, value) do: - ## # block is executed only if ``fd`` registered in selector ``s``. - ## value.uid = 1000 - ## do: - ## # block is executed if ``fd`` not registered in selector ``s``. - ## raise - ## - -else: - when hasThreadSupport: - import locks - - type - SharedArray[T] = UncheckedArray[T] - - proc allocSharedArray[T](nsize: int): ptr SharedArray[T] = - result = cast[ptr SharedArray[T]](allocShared0(sizeof(T) * nsize)) - - proc deallocSharedArray[T](sa: ptr SharedArray[T]) = - deallocShared(cast[pointer](sa)) - type - Event* {.pure.} = enum - Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot, - Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink, - VnodeRename, VnodeRevoke - - type - IOSelectorsException* = object of Exception - - ReadyKey* = object - fd* : int - events*: set[Event] - - SelectorKey[T] = object - ident: int - events: set[Event] - param: int - data: T - - proc raiseIOSelectorsError[T](message: T) = - var msg = "" - when T is string: - msg.add(message) - elif T is OSErrorCode: - msg.add(osErrorMsg(message) & " (code: " & $int(message) & ")") - else: - msg.add("Internal Error\n") - var err = newException(IOSelectorsException, msg) - raise err - - when not defined(windows): - import posix - - proc setNonBlocking(fd: cint) {.inline.} = - var x = fcntl(fd, F_GETFL, 0) - if x == -1: - raiseIOSelectorsError(osLastError()) - else: - var mode = x or O_NONBLOCK - if fcntl(fd, F_SETFL, mode) == -1: - raiseIOSelectorsError(osLastError()) - - template setKey(s, pident, pevents, pparam, pdata: untyped) = - var skey = addr(s.fds[pident]) - skey.ident = pident - skey.events = pevents - skey.param = pparam - skey.data = data - - when ioselSupportedPlatform: - template blockSignals(newmask: var Sigset, oldmask: var Sigset) = - when hasThreadSupport: - if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) - else: - if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) - - template unblockSignals(newmask: var Sigset, oldmask: var Sigset) = - when hasThreadSupport: - if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) - else: - if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1: - raiseIOSelectorsError(osLastError()) - - when defined(linux): - include ioselects/ioselectors_epoll - elif bsdPlatform: - include ioselects/ioselectors_kqueue - elif defined(windows): - include ioselects/ioselectors_select - elif defined(solaris): - include ioselects/ioselectors_poll # need to replace it with event ports - else: - include ioselects/ioselectors_poll diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim index 3a5cbc87aa..35cdace09e 100644 --- a/lib/pure/ioselects/ioselectors_epoll.nim +++ b/lib/pure/ioselects/ioselectors_epoll.nim @@ -9,7 +9,7 @@ # This module implements Linux epoll(). -import posix, times +import posix, times, epoll # Maximum number of events that can be returned const MAX_EPOLL_EVENTS = 64 @@ -36,35 +36,6 @@ when not defined(android): ssi_addr*: uint64 pad* {.importc: "__pad".}: array[0..47, uint8] -type - eventFdData {.importc: "eventfd_t", - header: "", pure, final.} = uint64 - epoll_data {.importc: "union epoll_data", header: "", - pure, final.} = object - u64 {.importc: "u64".}: uint64 - epoll_event {.importc: "struct epoll_event", - header: "", pure, final.} = object - events: uint32 # Epoll events - data: epoll_data # User data variable - -const - EPOLL_CTL_ADD = 1 # Add a file descriptor to the interface. - EPOLL_CTL_DEL = 2 # Remove a file descriptor from the interface. - EPOLL_CTL_MOD = 3 # Change file descriptor epoll_event structure. - EPOLLIN = 0x00000001 - EPOLLOUT = 0x00000004 - EPOLLERR = 0x00000008 - EPOLLHUP = 0x00000010 - EPOLLRDHUP = 0x00002000 - EPOLLONESHOT = 1 shl 30 - -proc epoll_create(size: cint): cint - {.importc: "epoll_create", header: "".} -proc epoll_ctl(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint - {.importc: "epoll_ctl", header: "".} -proc epoll_wait(epfd: cint; events: ptr epoll_event; maxevents: cint; - timeout: cint): cint - {.importc: "epoll_wait", header: "".} proc timerfd_create(clock_id: ClockId, flags: cint): cint {.cdecl, importc: "timerfd_create", header: "".} proc timerfd_settime(ufd: cint, flags: cint, @@ -80,26 +51,26 @@ when not defined(android): var RLIMIT_NOFILE {.importc: "RLIMIT_NOFILE", header: "".}: cint type - rlimit {.importc: "struct rlimit", + RLimit {.importc: "struct rlimit", header: "", pure, final.} = object rlim_cur: int rlim_max: int -proc getrlimit(resource: cint, rlp: var rlimit): cint +proc getrlimit(resource: cint, rlp: var RLimit): cint {.importc: "getrlimit",header: "".} when hasThreadSupport: type SelectorImpl[T] = object - epollFD : cint - maxFD : int + epollFD: cint + maxFD: int fds: ptr SharedArray[SelectorKey[T]] count: int Selector*[T] = ptr SelectorImpl[T] else: type SelectorImpl[T] = object - epollFD : cint - maxFD : int + epollFD: cint + maxFD: int fds: seq[SelectorKey[T]] count: int Selector*[T] = ref SelectorImpl[T] @@ -109,7 +80,8 @@ type SelectEvent* = ptr SelectEventImpl proc newSelector*[T](): Selector[T] = - var a = rlimit() + # Retrieve the maximum fd count (for current OS) via getrlimit() + var a = RLimit() if getrlimit(RLIMIT_NOFILE, a) != 0: raiseOsError(osLastError()) var maxFD = int(a.rlim_max) @@ -152,8 +124,8 @@ proc newSelectEvent*(): SelectEvent = result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) result.efd = fdci -proc setEvent*(ev: SelectEvent) = - var data : uint64 = 1 +proc trigger*(ev: SelectEvent) = + var data: uint64 = 1 if posix.write(ev.efd, addr data, sizeof(uint64)) == -1: raiseIOSelectorsError(osLastError()) @@ -164,6 +136,8 @@ proc close*(ev: SelectEvent) = raiseIOSelectorsError(osLastError()) template checkFd(s, f) = + # TODO: I don't see how this can ever happen. You won't be able to create an + # FD if there is too many. -- DP if f >= s.maxFD: raiseIOSelectorsError("Maximum number of descriptors is exhausted!") @@ -171,10 +145,10 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = let fdi = int(fd) s.checkFd(fdi) - doAssert(s.fds[fdi].ident == 0) + doAssert(s.fds[fdi].ident == 0, "Descriptor $# already registered" % $fdi) s.setKey(fdi, events, 0, data) if events != {}: - var epv = epoll_event(events: EPOLLRDHUP) + var epv = EpollEvent(events: EPOLLRDHUP) epv.data.u64 = fdi.uint if Event.Read in events: epv.events = epv.events or EPOLLIN if Event.Write in events: epv.events = epv.events or EPOLLOUT @@ -189,10 +163,10 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = s.checkFd(fdi) var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != 0, - "Descriptor [" & $fdi & "] is not registered in the queue!") + "Descriptor $# is not registered in the selector!" % $fdi) doAssert(pkey.events * maskEvents == {}) if pkey.events != events: - var epv = epoll_event(events: EPOLLRDHUP) + var epv = EpollEvent(events: EPOLLRDHUP) epv.data.u64 = fdi.uint if Event.Read in events: epv.events = epv.events or EPOLLIN @@ -217,24 +191,25 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = s.checkFd(fdi) var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != 0, - "Descriptor [" & $fdi & "] is not registered in the queue!") + "Descriptor $# is not registered in the selector!" % $fdi) if pkey.events != {}: when not defined(android): if pkey.events * {Event.Read, Event.Write} != {}: - var epv = epoll_event() + var epv = EpollEvent() + # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc. if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) dec(s.count) elif Event.Timer in pkey.events: if Event.Finished notin pkey.events: - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) dec(s.count) if posix.close(cint(fdi)) != 0: raiseIOSelectorsError(osLastError()) elif Event.Signal in pkey.events: - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) var nmask, omask: Sigset @@ -247,7 +222,7 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = raiseIOSelectorsError(osLastError()) elif Event.Process in pkey.events: if Event.Finished notin pkey.events: - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) var nmask, omask: Sigset @@ -260,13 +235,13 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = raiseIOSelectorsError(osLastError()) else: if pkey.events * {Event.Read, Event.Write} != {}: - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) dec(s.count) elif Event.Timer in pkey.events: if Event.Finished notin pkey.events: - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) dec(s.count) @@ -280,7 +255,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != 0, "Event is not registered in the queue!") doAssert(Event.User in pkey.events) - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) dec(s.count) @@ -300,7 +275,7 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, doAssert(s.fds[fdi].ident == 0) var events = {Event.Timer} - var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = fdi.uint if oneshot: new_ts.it_interval.tv_sec = 0.Time @@ -343,7 +318,7 @@ when not defined(android): s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) - var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = fdi.uint if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: raiseIOSelectorsError(osLastError()) @@ -370,7 +345,7 @@ when not defined(android): s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) - var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = fdi.uint epv.events = EPOLLIN or EPOLLRDHUP if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: @@ -383,7 +358,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = int(ev.efd) doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!") s.setKey(fdi, {Event.User}, 0, data) - var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = ev.efd.uint if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0: raiseIOSelectorsError(osLastError()) @@ -392,7 +367,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = proc selectInto*[T](s: Selector[T], timeout: int, results: var openarray[ReadyKey]): int = var - resTable: array[MAX_EPOLL_EVENTS, epoll_event] + resTable: array[MAX_EPOLL_EVENTS, EpollEvent] maxres = MAX_EPOLL_EVENTS i, k: int @@ -482,7 +457,7 @@ proc selectInto*[T](s: Selector[T], timeout: int, rkey.events.incl(Event.User) if Event.Oneshot in pkey.events: - var epv = epoll_event() + var epv = EpollEvent() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0: raiseIOSelectorsError(osLastError()) # we will not clear key until it will be unregistered, so @@ -505,16 +480,19 @@ proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) -proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + return s.fds[fd].ident != 0 + +proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: + if fdi in s: result = s.fds[fdi].data proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: + if fdi in s: s.fds[fdi].data = data result = true @@ -523,8 +501,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, mixin checkFd let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].data) + if fdi in s: + var value = addr(s.getData(fdi)) body template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, @@ -532,8 +510,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, mixin checkFd let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].data) + if fdi in s: + var value = addr(s.getData(fdi)) body1 else: body2 diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim index 7786de46a2..3e2ec64a80 100644 --- a/lib/pure/ioselects/ioselectors_kqueue.nim +++ b/lib/pure/ioselects/ioselectors_kqueue.nim @@ -144,7 +144,7 @@ proc newSelectEvent*(): SelectEvent = result.rfd = fds[0] result.wfd = fds[1] -proc setEvent*(ev: SelectEvent) = +proc trigger*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): raiseIOSelectorsError(osLastError()) @@ -243,7 +243,7 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, s.checkFd(fdi) var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != 0, - "Descriptor [" & $fdi & "] is not registered in the queue!") + "Descriptor $# is not registered in the queue!" % $fdi) doAssert(pkey.events * maskEvents == {}) if pkey.events != events: @@ -584,16 +584,19 @@ proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) -proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + return s.fds[fd].ident != 0 + +proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: + if fdi in s: result = s.fds[fdi].data proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: + if fdi in s: s.fds[fdi].data = data result = true @@ -602,8 +605,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, mixin checkFd let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].data) + if fdi in s: + var value = addr(s.getData(fdi)) body template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, @@ -611,8 +614,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, mixin checkFd let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].data) + if fdi in s: + var value = addr(s.getData(fdi)) body1 else: body2 diff --git a/lib/pure/ioselects/ioselectors_poll.nim b/lib/pure/ioselects/ioselectors_poll.nim index 1b90e08066..cc06aa592f 100644 --- a/lib/pure/ioselects/ioselectors_poll.nim +++ b/lib/pure/ioselects/ioselectors_poll.nim @@ -208,7 +208,7 @@ proc newSelectEvent*(): SelectEvent = result.rfd = fds[0] result.wfd = fds[1] -proc setEvent*(ev: SelectEvent) = +proc trigger*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): raiseIOSelectorsError(osLastError()) @@ -279,16 +279,19 @@ proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) -proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + return s.fds[fd].ident != 0 + +proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: + if fdi in s: result = s.fds[fdi].data proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: + if fdi in s: s.fds[fdi].data = data result = true @@ -297,8 +300,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, mixin checkFd let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].data) + if fdi in s: + var value = addr(s.getData(fdi)) body template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, @@ -306,8 +309,8 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, mixin checkFd let fdi = int(fd) s.checkFd(fdi) - if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].data) + if fdi in s: + var value = addr(s.getData(fdi)) body1 else: body2 diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim index dc3451d52e..017d08117d 100644 --- a/lib/pure/ioselects/ioselectors_select.nim +++ b/lib/pure/ioselects/ioselectors_select.nim @@ -154,7 +154,7 @@ when defined(windows): result.rsock = rsock result.wsock = wsock - proc setEvent*(ev: SelectEvent) = + proc trigger*(ev: SelectEvent) = var data: uint64 = 1 if winlean.send(ev.wsock, cast[pointer](addr data), cint(sizeof(uint64)), 0) != sizeof(uint64): @@ -178,7 +178,7 @@ else: result.rsock = SocketHandle(fds[0]) result.wsock = SocketHandle(fds[1]) - proc setEvent*(ev: SelectEvent) = + proc trigger*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64): raiseIOSelectorsError(osLastError()) @@ -379,6 +379,16 @@ proc flush*[T](s: Selector[T]) = discard template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + s.withSelectLock(): + result = false + + let fdi = int(fd) + for i in 0..= 1000: Timespec(tv_sec: (timeout div 1000).Time, tv_nsec: 0) - else: Timespec(tv_sec: 0.Time, tv_nsec: timeout * 1000000) - let evNum = kevent(s.kqFD, nil, 0, addr s.events[0], 64.cint, addr tv) - if evNum < 0: - let err = osLastError() - if err.cint == EINTR: - return @[] - raiseOSError(err) - if evNum == 0: return @[] - for i in 0 ..< evNum: - let fd = s.events[i].ident.SocketHandle - - var evSet: set[Event] = {} - if (s.events[i].flags and EV_EOF) != 0: evSet = evSet + {EvError} - if s.events[i].filter == EVFILT_READ: evSet = evSet + {EvRead} - elif s.events[i].filter == EVFILT_WRITE: evSet = evSet + {EvWrite} - let selectorKey = s.fds[fd] - assert selectorKey.fd != 0.SocketHandle - result.add((selectorKey, evSet)) - - proc newSelector*(): Selector = - result.kqFD = kqueue() - if result.kqFD < 0: - raiseOSError(osLastError()) - when MultiThreaded: - result.fds = initSharedTable[SocketHandle, SelectorKey]() - else: - result.fds = initTable[SocketHandle, SelectorKey]() - - proc contains*(s: Selector, fd: SocketHandle): bool = - ## Determines whether selector contains a file descriptor. - s.fds.hasKey(fd) # and s.fds[fd].events != {} - - proc `[]`*(s: Selector, fd: SocketHandle): SelectorKey = - ## Retrieves the selector key for ``fd``. - return s.fds[fd] - -elif not defined(nimdoc): - # TODO: kqueue for bsd/mac os x. - type - Selector* = object - when MultiThreaded: - fds: SharedTable[SocketHandle, SelectorKey] - else: - fds: Table[SocketHandle, SelectorKey] - - proc register*(s: var Selector, fd: SocketHandle, events: set[Event], - data: SelectorData) = - let result = SelectorKey(fd: fd, events: events, data: data) - if s.fds.hasKeyOrPut(fd, result): - raise newException(ValueError, "File descriptor already exists.") - - proc update*(s: var Selector, fd: SocketHandle, events: set[Event]) = - #if not s.fds.hasKey(fd): - # raise newException(ValueError, "File descriptor not found.") - s.fds[fd].events = events - - proc unregister*(s: var Selector, fd: SocketHandle) = - s.fds.del(fd) - - proc close*(s: var Selector) = - when MultiThreaded: deinitSharedTable(s.fds) - - proc timeValFromMilliseconds(timeout: int): TimeVal = - if timeout != -1: - var seconds = timeout div 1000 - result.tv_sec = seconds.int32 - result.tv_usec = ((timeout - seconds * 1000) * 1000).int32 - - proc createFdSet(rd, wr: var TFdSet, s: Selector, m: var int) = - FD_ZERO(rd); FD_ZERO(wr) - for k, v in pairs(s.fds): - if EvRead in v.events: - m = max(m, int(k)) - FD_SET(k, rd) - if EvWrite in v.events: - m = max(m, int(k)) - FD_SET(k, wr) - - proc getReadyFDs(rd, wr: var TFdSet, - s: var Selector): seq[ReadyInfo] = - result = @[] - for k, v in pairs(s.fds): - var events: set[Event] = {} - if FD_ISSET(k, rd) != 0'i32: - events = events + {EvRead} - if FD_ISSET(k, wr) != 0'i32: - events = events + {EvWrite} - result.add((v, events)) - - proc select*(s: var Selector, timeout: int): seq[ReadyInfo] = - var tv {.noInit.}: TimeVal = timeValFromMilliseconds(timeout) - - var rd, wr: TFdSet - var m = 0 - createFdSet(rd, wr, s, m) - - var retCode = 0 - if timeout != -1: - retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, addr(tv))) - else: - retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, nil)) - - if retCode < 0: - raiseOSError(osLastError()) - elif retCode == 0: - return @[] - else: - return getReadyFDs(rd, wr, s) - - proc newSelector*(): Selector = - when MultiThreaded: - result.fds = initSharedTable[SocketHandle, SelectorKey]() - else: - result.fds = initTable[SocketHandle, SelectorKey]() - - proc contains*(s: Selector, fd: SocketHandle): bool = - return s.fds.hasKey(fd) - - proc `[]`*(s: Selector, fd: SocketHandle): SelectorKey = - return s.fds[fd] - -proc contains*(s: Selector, key: SelectorKey): bool = - ## Determines whether selector contains this selector key. More accurate - ## than checking if the file descriptor is in the selector because it - ## ensures that the keys are equal. File descriptors may not always be - ## unique especially when an fd is closed and then a new one is opened, - ## the new one may have the same value. - when not defined(nimdoc): - return key.fd in s and s.fds[key.fd] == key - -proc len*(s: Selector): int = - ## Retrieves the number of registered file descriptors in this Selector. - when not defined(nimdoc): - return s.fds.len - -{.deprecated: [TEvent: Event, PSelectorKey: SelectorKey, - TReadyInfo: ReadyInfo, PSelector: Selector].} +else: + when hasThreadSupport: + import locks -when not defined(testing) and isMainModule and not defined(nimdoc): - # Select() - import sockets - - when MultiThreaded: type - SockWrapper = object - sock: Socket - else: - type - SockWrapper = ref object of RootObj - sock: Socket + SharedArray[T] = UncheckedArray[T] - var sock = socket() - if sock == sockets.invalidSocket: raiseOSError(osLastError()) - #sock.setBlocking(false) - sock.connect("irc.freenode.net", Port(6667)) + proc allocSharedArray[T](nsize: int): ptr SharedArray[T] = + result = cast[ptr SharedArray[T]](allocShared0(sizeof(T) * nsize)) - var selector = newSelector() - var data = SockWrapper(sock: sock) - when MultiThreaded: - selector.register(sock.getFD, {EvWrite}, addr data) + proc deallocSharedArray[T](sa: ptr SharedArray[T]) = + deallocShared(cast[pointer](sa)) + + type + Event* {.pure.} = enum + Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot, + Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink, + VnodeRename, VnodeRevoke + + type + IOSelectorsException* = object of Exception + + ReadyKey* = object + fd* : int + events*: set[Event] + + SelectorKey[T] = object + ident: int + events: set[Event] + param: int + data: T + + proc raiseIOSelectorsError[T](message: T) = + var msg = "" + when T is string: + msg.add(message) + elif T is OSErrorCode: + msg.add(osErrorMsg(message) & " (code: " & $int(message) & ")") + else: + msg.add("Internal Error\n") + var err = newException(IOSelectorsException, msg) + raise err + + proc setNonBlocking(fd: cint) {.inline.} = + setBlocking(fd.SocketHandle, false) + + when not defined(windows): + import posix + + template setKey(s, pident, pevents, pparam, pdata: untyped) = + var skey = addr(s.fds[pident]) + skey.ident = pident + skey.events = pevents + skey.param = pparam + skey.data = data + + when ioselSupportedPlatform: + template blockSignals(newmask: var Sigset, oldmask: var Sigset) = + when hasThreadSupport: + if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1: + raiseIOSelectorsError(osLastError()) + else: + if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1: + raiseIOSelectorsError(osLastError()) + + template unblockSignals(newmask: var Sigset, oldmask: var Sigset) = + when hasThreadSupport: + if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1: + raiseIOSelectorsError(osLastError()) + else: + if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1: + raiseIOSelectorsError(osLastError()) + + when defined(linux): + include ioselects/ioselectors_epoll + elif bsdPlatform: + include ioselects/ioselectors_kqueue + elif defined(windows): + include ioselects/ioselectors_select + elif defined(solaris): + include ioselects/ioselectors_poll # need to replace it with event ports else: - selector.register(sock.getFD, {EvWrite}, data) - var i = 0 - while true: - let ready = selector.select(1000) - echo ready.len - if ready.len > 0: echo ready[0].events - i.inc - if i == 6: - selector.unregister(sock.getFD) - selector.close() - break + include ioselects/ioselectors_poll + +{.deprecated: [setEvent: trigger].} +{.deprecated: [register: registerHandle].} +{.deprecated: [update: updateHandle].} diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim index 034c2185c7..48043b4b56 100644 --- a/tests/async/tioselectors.nim +++ b/tests/async/tioselectors.nim @@ -2,7 +2,7 @@ discard """ file: "tioselectors.nim" output: "All tests passed!" """ -import ioselectors +import selectors const hasThreadSupport = compileOption("threads") diff --git a/tests/testament/categories.nim b/tests/testament/categories.nim index ca621969f3..5468fc309d 100644 --- a/tests/testament/categories.nim +++ b/tests/testament/categories.nim @@ -205,7 +205,6 @@ proc ioTests(r: var TResults, cat: Category, options: string) = proc asyncTests(r: var TResults, cat: Category, options: string) = template test(filename: untyped) = testSpec r, makeTest(filename, options, cat) - testSpec r, makeTest(filename, options & " -d:upcoming", cat) for t in os.walkFiles("tests/async/t*.nim"): test(t)