Merge pull request #6796 from nim-lang/async-improvements

Async improvements
This commit is contained in:
Dominik Picheta
2018-01-28 19:11:17 +00:00
committed by GitHub
13 changed files with 118 additions and 1668 deletions

View File

@@ -12,6 +12,15 @@
`getBool`, `getFloat`, `getBiggestInt`. Also `getInt` procedure was added.
- `reExtended` is no longer default for the `re` constructor in the `re`
module.
- `newAsyncSocket` taking an `AsyncFD` now runs `setBlocking(false)` on the
fd.
- The `ReadyKey` type in the selectors module now contains an ``errorCode``
field to help distinguish between ``Event.Error`` events.
- Implemented an `accept` proc that works on a `SocketHandle` in
``nativesockets``.
- Implemented ``getIoHandler`` proc in the ``asyncdispatch`` module that allows
you to retrieve the underlying IO Completion Port or ``Selector[AsyncData]``
object in the specified dispatcher.
- The overloading rules changed slightly so that constrained generics are
preferred over unconstrained generics. (Bug #6526)
- It is now possible to forward declare object types so that mutually

View File

@@ -265,9 +265,15 @@ when defined(windows) or defined(nimdoc):
setGlobalDispatcher(newDispatcher())
result = gDisp
proc getIoHandler*(disp: PDispatcher): Handle =
## Returns the underlying IO Completion Port handle (Windows) or selector
## (Unix) for the specified dispatcher.
return disp.ioPort
proc register*(fd: AsyncFD) =
## Registers ``fd`` with the dispatcher.
let p = getGlobalDispatcher()
if createIoCompletionPort(fd.Handle, p.ioPort,
cast[CompletionKey](fd), 1) == 0:
raiseOSError(osLastError())
@@ -757,6 +763,9 @@ when defined(windows) or defined(nimdoc):
## Unregisters ``fd``.
getGlobalDispatcher().handles.excl(fd)
proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
return fd in disp.handles
{.push stackTrace:off.}
proc waitableCallback(param: pointer,
timerOrWaitFired: WINBOOL): void {.stdcall.} =
@@ -977,7 +986,7 @@ when defined(windows) or defined(nimdoc):
proc newAsyncEvent*(): AsyncEvent =
## Creates a new thread-safe ``AsyncEvent`` object.
##
## New ``AsyncEvent`` object is not automatically registered with # TODO: Why? -- DP
## New ``AsyncEvent`` object is not automatically registered with
## dispatcher like ``AsyncSocket``.
var sa = SECURITY_ATTRIBUTES(
nLength: sizeof(SECURITY_ATTRIBUTES).cint,
@@ -1095,6 +1104,9 @@ else:
setGlobalDispatcher(newDispatcher())
result = gDisp
proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
return disp.selector
proc register*(fd: AsyncFD) =
let p = getGlobalDispatcher()
var data = newAsyncData()
@@ -1110,6 +1122,9 @@ else:
proc unregister*(ev: AsyncEvent) =
getGlobalDispatcher().selector.unregister(SelectEvent(ev))
proc contains*(disp: PDispatcher, fd: AsyncFd): bool =
return fd.SocketHandle in disp.selector
proc addRead*(fd: AsyncFD, cb: Callback) =
let p = getGlobalDispatcher()

View File

@@ -85,7 +85,7 @@ proc newAsyncFile*(fd: AsyncFd): AsyncFile =
## Creates `AsyncFile` with a previously opened file descriptor `fd`.
new result
result.fd = fd
register(result.fd)
register(fd)
proc openAsync*(filename: string, mode = fmRead): AsyncFile =
## Opens a file specified by the path in ``filename`` using
@@ -97,16 +97,16 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile =
when useWinUnicode:
let fd = createFileW(newWideCString(filename), desiredAccess,
FILE_SHARE_READ,
nil, creationDisposition, flags, 0).AsyncFd
nil, creationDisposition, flags, 0)
else:
let fd = createFileA(filename, desiredAccess,
FILE_SHARE_READ,
nil, creationDisposition, flags, 0).AsyncFd
nil, creationDisposition, flags, 0)
if fd.Handle == INVALID_HANDLE_VALUE:
if fd == INVALID_HANDLE_VALUE:
raiseOSError(osLastError())
result = newAsyncFile(fd)
result = newAsyncFile(fd.AsyncFd)
if mode == fmAppend:
result.offset = getFileSize(result)
@@ -115,11 +115,11 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile =
let flags = getPosixFlags(mode)
# RW (Owner), RW (Group), R (Other)
let perm = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH
let fd = open(filename, flags, perm).AsyncFD
if fd.cint == -1:
let fd = open(filename, flags, perm)
if fd == -1:
raiseOSError(osLastError())
result = newAsyncFile(fd)
result = newAsyncFile(fd.AsyncFd)
proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
## Read ``size`` bytes from the specified file asynchronously starting at

View File

@@ -342,6 +342,7 @@ proc asyncCheck*[T](future: Future[T]) =
## finished with an error.
##
## This should be used instead of ``discard`` to discard void futures.
assert(not future.isNil, "Future is nil")
future.callback =
proc () =
if future.failed:

View File

@@ -140,9 +140,16 @@ proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP, buffered = true): AsyncSocket =
## Creates a new ``AsyncSocket`` based on the supplied params.
##
## The supplied ``fd``'s non-blocking state will be enabled implicitly.
##
## **Note**: This procedure will **NOT** register ``fd`` with the global
## async dispatcher. You need to do this manually. If you have used
## ``newAsyncNativeSocket`` to create ``fd`` then it's already registered.
assert fd != osInvalidSocket.AsyncFD
new(result)
result.fd = fd.SocketHandle
fd.SocketHandle.setBlocking(false)
result.isBuffered = buffered
result.domain = domain
result.sockType = sockType

View File

@@ -141,7 +141,7 @@ template checkFd(s, f) =
if f >= s.maxFD:
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
@@ -156,7 +156,7 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
raiseIOSelectorsError(osLastError())
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
let fdi = int(fd)
@@ -392,9 +392,19 @@ proc selectInto*[T](s: Selector[T], timeout: int,
let pevents = resTable[i].events
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != 0)
var rkey = ReadyKey(fd: int(fdi), events: {})
var rkey = ReadyKey(fd: fdi, events: {})
if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
if (pevents and EPOLLHUP) != 0:
rkey.errorCode = ECONNRESET.OSErrorCode
else:
# Try reading SO_ERROR from fd.
var error: cint
var size = sizeof(error).SockLen
if getsockopt(fdi.SocketHandle, SOL_SOCKET, SO_ERROR, addr(error),
addr(size)) == 0'i32:
rkey.errorCode = error.OSErrorCode
rkey.events.incl(Event.Error)
if (pevents and EPOLLOUT) != 0:
rkey.events.incl(Event.Write)
@@ -482,7 +492,7 @@ template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd].ident != 0
return s.fds[fd.int].ident != 0
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
@@ -516,3 +526,6 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
body1
else:
body2
proc getFd*[T](s: Selector[T]): int =
return s.epollFd.int

View File

@@ -217,7 +217,7 @@ else:
raiseIOSelectorsError(osLastError())
s.changes.setLen(0)
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
let fdi = int(fd)
s.checkFd(fdi)
@@ -235,7 +235,7 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
when not declared(CACHE_EVENTS):
flushKQueue(s)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
@@ -503,6 +503,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
if (kevent.flags and EV_ERROR) != 0:
rkey.events = {Event.Error}
rkey.errorCode = kevent.data.OSErrorCode
case kevent.filter:
of EVFILT_READ:
@@ -569,6 +570,13 @@ proc selectInto*[T](s: Selector[T], timeout: int,
doAssert(true, "Unsupported kqueue filter in the queue!")
if (kevent.flags and EV_EOF) != 0:
if kevent.fflags != 0:
rkey.errorCode = kevent.fflags.OSErrorCode
else:
# This assumes we are dealing with sockets.
# TODO: For future-proofing it might be a good idea to give the
# user access to the raw `kevent`.
rkey.errorCode = ECONNRESET.OSErrorCode
rkey.events.incl(Event.Error)
results[k] = rkey
@@ -585,7 +593,7 @@ template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd].ident != 0
return s.fds[fd.int].ident != 0
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
@@ -619,3 +627,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
body1
else:
body2
proc getFd*[T](s: Selector[T]): int =
return s.kqFD.int

View File

@@ -141,7 +141,7 @@ template checkFd(s, f) =
if f >= s.maxFD:
raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
var fdi = int(fd)
s.checkFd(fdi)
@@ -149,7 +149,7 @@ proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
setKey(s, fdi, events, 0, data)
if events != {}: s.pollAdd(fdi.cint, events)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
@@ -280,7 +280,7 @@ template isEmpty*[T](s: Selector[T]): bool =
(s.count == 0)
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
return s.fds[fd].ident != 0
return s.fds[fd.int].ident != 0
proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
let fdi = int(fd)
@@ -314,3 +314,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
body1
else:
body2
proc getFd*[T](s: Selector[T]): int =
return -1

View File

@@ -229,7 +229,7 @@ proc delKey[T](s: Selector[T], fd: SocketHandle) =
doAssert(i < FD_SETSIZE,
"Descriptor [" & $int(fd) & "] is not registered in the queue!")
proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
when not defined(windows):
let fdi = int(fd)
@@ -255,7 +255,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
IOFD_SET(ev.rsock, addr s.rSet)
inc(s.count)
proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) =
let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
Event.User, Event.Oneshot, Event.Error}
@@ -453,3 +453,6 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
else:
body2
proc getFd*[T](s: Selector[T]): int =
return -1

View File

@@ -187,12 +187,12 @@ proc toSockType*(protocol: Protocol): SockType =
proc newNativeSocket*(domain: Domain = AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP): SocketHandle =
## Creates a new socket; returns `InvalidSocket` if an error occurs.
## Creates a new socket; returns `osInvalidSocket` if an error occurs.
socket(toInt(domain), toInt(sockType), toInt(protocol))
proc newNativeSocket*(domain: cint, sockType: cint,
protocol: cint): SocketHandle =
## Creates a new socket; returns `InvalidSocket` if an error occurs.
## Creates a new socket; returns `osInvalidSocket` if an error occurs.
##
## Use this overload if one of the enums specified above does
## not contain what you need.
@@ -666,6 +666,19 @@ proc selectWrite*(writefds: var seq[SocketHandle],
pruneSocketSet(writefds, (wr))
proc accept*(fd: SocketHandle): (SocketHandle, string) =
## Accepts a new client connection.
##
## Returns (osInvalidSocket, "") if an error occurred.
var sockAddress: Sockaddr_in
var addrLen = sizeof(sockAddress).SockLen
var sock = accept(fd, cast[ptr SockAddr](addr(sockAddress)),
addr(addrLen))
if sock == osInvalidSocket:
return (osInvalidSocket, "")
else:
return (sock, $inet_ntoa(sockAddress.sin_addr))
when defined(Windows):
var wsa: WSAData
if wsaStartup(0x0101'i16, addr wsa) != 0: raiseOSError(osLastError())

View File

@@ -753,10 +753,8 @@ proc acceptAddr*(server: Socket, client: var Socket, address: var string,
## flag is specified then this error will not be raised and instead
## accept will be called again.
assert(client != nil)
var sockAddress: Sockaddr_in
var addrLen = sizeof(sockAddress).SockLen
var sock = accept(server.fd, cast[ptr SockAddr](addr(sockAddress)),
addr(addrLen))
let ret = accept(server.fd)
let sock = ret[0]
if sock == osInvalidSocket:
let err = osLastError()
@@ -764,6 +762,7 @@ proc acceptAddr*(server: Socket, client: var Socket, address: var string,
acceptAddr(server, client, address, flags)
raiseOSError(err)
else:
address = ret[1]
client.fd = sock
client.isBuffered = server.isBuffered
@@ -776,9 +775,6 @@ proc acceptAddr*(server: Socket, client: var Socket, address: var string,
let ret = SSLAccept(client.sslHandle)
socketError(client, ret, false)
# Client socket is set above.
address = $inet_ntoa(sockAddress.sin_addr)
when false: #defineSsl:
proc acceptAddrSSL*(server: Socket, client: var Socket,
address: var string): SSLAcceptResult {.

View File

@@ -54,9 +54,9 @@ when defined(nimdoc):
Timer, ## Timer descriptor is completed
Signal, ## Signal is raised
Process, ## Process is finished
Vnode, ## BSD specific file change happens
Vnode, ## BSD specific file change
User, ## User event is raised
Error, ## Error happens while waiting, for descriptor
Error, ## Error occurred while waiting for descriptor
VnodeWrite, ## NOTE_WRITE (BSD specific, write to file occurred)
VnodeDelete, ## NOTE_DELETE (BSD specific, unlink of file occurred)
VnodeExtend, ## NOTE_EXTEND (BSD specific, file extended)
@@ -69,6 +69,8 @@ when defined(nimdoc):
## An object which holds result for descriptor
fd* : int ## file/socket descriptor
events*: set[Event] ## set of events
errorCode*: OSErrorCode ## additional error code information for
## Error events
SelectEvent* = object
## An object which holds user defined event
@@ -79,13 +81,14 @@ when defined(nimdoc):
proc close*[T](s: Selector[T]) =
## Closes the selector.
proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event],
data: T) =
proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event], data: T) =
## Registers file/socket descriptor ``fd`` to selector ``s``
## with events set in ``events``. The ``data`` is application-defined
## data, which will be passed when an event is triggered.
proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle,
events: set[Event]) =
## Update file/socket descriptor ``fd``, registered in selector
## ``s`` with new events set ``event``.
@@ -221,11 +224,15 @@ when defined(nimdoc):
proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
## Determines whether selector contains a file descriptor.
proc getFd*[T](s: Selector[T]): int =
## Retrieves the underlying selector's file descriptor.
##
## For *poll* and *select* selectors ``-1`` is returned.
else:
when hasThreadSupport:
import locks
type
SharedArray[T] = UncheckedArray[T]
@@ -234,7 +241,6 @@ else:
proc deallocSharedArray[T](sa: ptr SharedArray[T]) =
deallocShared(cast[pointer](sa))
type
Event* {.pure.} = enum
Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot,
@@ -247,6 +253,7 @@ else:
ReadyKey* = object
fd* : int
events*: set[Event]
errorCode*: OSErrorCode
SelectorKey[T] = object
ident: int
@@ -264,7 +271,7 @@ else:
msg.add("Internal Error\n")
var err = newException(IOSelectorsException, msg)
raise err
proc setNonBlocking(fd: cint) {.inline.} =
setBlocking(fd.SocketHandle, false)

File diff suppressed because it is too large Load Diff