mirror of
https://github.com/nim-lang/Nim.git
synced 2026-01-06 13:07:48 +00:00
Implement dial, support IPv6 in httpclient (#5763)
* Implement dial, support IPv6 in httpclient Added ``dial`` procedure to networking modules: ``net``, ``asyncdispatch``, ``asyncnet``. It merges socket creation, address resolution, and connection into single step. When using ``dial``, you don't have to worry about IPv4 vs IPv6 problem. Fixed addrInfo loop in connect to behave properly. Previously it would stop on first non-immediate failure, instead of continuing and trying the remaining addresses. Fixed newAsyncNativeSocket to raise proper error if socket creation fails. Fixes: #3811 * Check domain during connect() only on non-Windows This is how it was in the previous implementation of connect(). * Call 'osLastError' before 'close' in net.dial * Record osLastError before freeAddrInfo in net.dial * Add missing docs for 'dial' proc * Optimize dial to create one FD per domain, add tests And make async IPv6 servers work on Windows. * Add IPv6 test to uri module * Fix getAddrString error handling
This commit is contained in:
committed by
Andreas Rumpf
parent
6377b52d8e
commit
ecf278c467
@@ -9,7 +9,7 @@
|
||||
|
||||
include "system/inclrtl"
|
||||
|
||||
import os, oids, tables, strutils, times, heapqueue
|
||||
import os, oids, tables, strutils, times, heapqueue, options
|
||||
|
||||
import nativesockets, net, deques
|
||||
|
||||
@@ -385,68 +385,6 @@ when defined(windows) or defined(nimdoc):
|
||||
dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
|
||||
RemoteSockaddr, RemoteSockaddrLength)
|
||||
|
||||
proc connect*(socket: AsyncFD, address: string, port: Port,
|
||||
domain = nativesockets.AF_INET): Future[void] =
|
||||
## Connects ``socket`` to server at ``address:port``.
|
||||
##
|
||||
## Returns a ``Future`` which will complete when the connection succeeds
|
||||
## or an error occurs.
|
||||
verifyPresence(socket)
|
||||
var retFuture = newFuture[void]("connect")
|
||||
# Apparently ``ConnectEx`` expects the socket to be initially bound:
|
||||
var saddr: Sockaddr_in
|
||||
saddr.sin_family = int16(toInt(domain))
|
||||
saddr.sin_port = 0
|
||||
saddr.sin_addr.s_addr = INADDR_ANY
|
||||
if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
|
||||
sizeof(saddr).SockLen) < 0'i32:
|
||||
raiseOSError(osLastError())
|
||||
|
||||
var aiList = getAddrInfo(address, port, domain)
|
||||
var success = false
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
# "the OVERLAPPED structure must remain valid until the I/O completes"
|
||||
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = CompletionData(fd: socket, cb:
|
||||
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
retFuture.complete()
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
||||
)
|
||||
|
||||
var ret = connectEx(socket.SocketHandle, it.ai_addr,
|
||||
sizeof(Sockaddr_in).cint, nil, 0, nil,
|
||||
cast[POVERLAPPED](ol))
|
||||
if ret:
|
||||
# Request to connect completed immediately.
|
||||
success = true
|
||||
retFuture.complete()
|
||||
# We don't deallocate ``ol`` here because even though this completed
|
||||
# immediately poll will still be notified about its completion and it will
|
||||
# free ``ol``.
|
||||
break
|
||||
else:
|
||||
lastError = osLastError()
|
||||
if lastError.int32 == ERROR_IO_PENDING:
|
||||
# In this case ``ol`` will be deallocated in ``poll``.
|
||||
success = true
|
||||
break
|
||||
else:
|
||||
GC_unref(ol)
|
||||
success = false
|
||||
it = it.ai_next
|
||||
|
||||
freeAddrInfo(aiList)
|
||||
if not success:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
return retFuture
|
||||
|
||||
proc recv*(socket: AsyncFD, size: int,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] =
|
||||
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
|
||||
@@ -754,8 +692,8 @@ when defined(windows) or defined(nimdoc):
|
||||
var lpOutputBuf = newString(lpOutputLen)
|
||||
var dwBytesReceived: Dword
|
||||
let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
|
||||
let dwLocalAddressLength = Dword(sizeof(Sockaddr_in) + 16)
|
||||
let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16)
|
||||
let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
|
||||
let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
|
||||
|
||||
template failAccept(errcode) =
|
||||
if flags.isDisconnectionError(errcode):
|
||||
@@ -785,12 +723,14 @@ when defined(windows) or defined(nimdoc):
|
||||
dwLocalAddressLength, dwRemoteAddressLength,
|
||||
addr localSockaddr, addr localLen,
|
||||
addr remoteSockaddr, addr remoteLen)
|
||||
register(clientSock.AsyncFD)
|
||||
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
|
||||
retFuture.complete(
|
||||
(address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
|
||||
client: clientSock.AsyncFD)
|
||||
)
|
||||
try:
|
||||
let address = getAddrString(remoteSockAddr)
|
||||
register(clientSock.AsyncFD)
|
||||
retFuture.complete((address: address, client: clientSock.AsyncFD))
|
||||
except:
|
||||
# getAddrString may raise
|
||||
clientSock.close()
|
||||
retFuture.fail(getCurrentException())
|
||||
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
@@ -823,20 +763,6 @@ when defined(windows) or defined(nimdoc):
|
||||
|
||||
return retFuture
|
||||
|
||||
proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD =
|
||||
## Creates a new socket and registers it with the dispatcher implicitly.
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
register(result)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET,
|
||||
sockType: SockType = SOCK_STREAM,
|
||||
protocol: Protocol = IPPROTO_TCP): AsyncFD =
|
||||
## Creates a new socket and registers it with the dispatcher implicitly.
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
register(result)
|
||||
|
||||
proc closeSocket*(socket: AsyncFD) =
|
||||
## Closes a socket and ensures that it is unregistered.
|
||||
socket.SocketHandle.close()
|
||||
@@ -1015,23 +941,6 @@ else:
|
||||
var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
|
||||
p.selector.register(fd.SocketHandle, {}, data.RootRef)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: cint, sockType: cint,
|
||||
protocol: cint): AsyncFD =
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
when defined(macosx):
|
||||
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
|
||||
register(result)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: Domain = AF_INET,
|
||||
sockType: SockType = SOCK_STREAM,
|
||||
protocol: Protocol = IPPROTO_TCP): AsyncFD =
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
when defined(macosx):
|
||||
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
|
||||
register(result)
|
||||
|
||||
proc closeSocket*(sock: AsyncFD) =
|
||||
let disp = getGlobalDispatcher()
|
||||
disp.selector.unregister(sock.SocketHandle)
|
||||
@@ -1115,50 +1024,6 @@ else:
|
||||
# Callback queue processing
|
||||
processPendingCallbacks(p)
|
||||
|
||||
proc connect*(socket: AsyncFD, address: string, port: Port,
|
||||
domain = AF_INET): Future[void] =
|
||||
var retFuture = newFuture[void]("connect")
|
||||
|
||||
proc cb(fd: AsyncFD): bool =
|
||||
var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
|
||||
if ret == 0:
|
||||
# We have connected.
|
||||
retFuture.complete()
|
||||
return true
|
||||
elif ret == EINTR:
|
||||
# interrupted, keep waiting
|
||||
return false
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
|
||||
return true
|
||||
|
||||
assert getSockDomain(socket.SocketHandle) == domain
|
||||
var aiList = getAddrInfo(address, port, domain)
|
||||
var success = false
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
|
||||
if ret == 0:
|
||||
# Request to connect completed immediately.
|
||||
success = true
|
||||
retFuture.complete()
|
||||
break
|
||||
else:
|
||||
lastError = osLastError()
|
||||
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
|
||||
success = true
|
||||
addWrite(socket, cb)
|
||||
break
|
||||
else:
|
||||
success = false
|
||||
it = it.ai_next
|
||||
|
||||
freeAddrInfo(aiList)
|
||||
if not success:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
return retFuture
|
||||
|
||||
proc recv*(socket: AsyncFD, size: int,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] =
|
||||
var retFuture = newFuture[string]("recv")
|
||||
@@ -1320,11 +1185,20 @@ else:
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
else:
|
||||
register(client.AsyncFD)
|
||||
retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
|
||||
try:
|
||||
let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
|
||||
register(client.AsyncFD)
|
||||
retFuture.complete((address, client.AsyncFD))
|
||||
except:
|
||||
# getAddrString may raise
|
||||
client.close()
|
||||
retFuture.fail(getCurrentException())
|
||||
addRead(socket, cb)
|
||||
return retFuture
|
||||
|
||||
# Common procedures between current and upcoming asyncdispatch
|
||||
include includes.asynccommon
|
||||
|
||||
proc sleepAsync*(ms: int): Future[void] =
|
||||
## Suspends the execution of the current async procedure for the next
|
||||
## ``ms`` milliseconds.
|
||||
|
||||
@@ -244,6 +244,17 @@ when defineSsl:
|
||||
else:
|
||||
raiseSSLError("Socket has been disconnected")
|
||||
|
||||
proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
|
||||
buffered = true): Future[AsyncSocket] {.async.} =
|
||||
## Establishes connection to the specified ``address``:``port`` pair via the
|
||||
## specified protocol. The procedure iterates through possible
|
||||
## resolutions of the ``address`` until it succeeds, meaning that it
|
||||
## seamlessly works with both IPv4 and IPv6.
|
||||
## Returns AsyncSocket ready to send or receive data.
|
||||
let asyncFd = await asyncdispatch.dial(address, port, protocol)
|
||||
let sockType = protocol.toSockType()
|
||||
let domain = getSockDomain(asyncFd.SocketHandle)
|
||||
result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
|
||||
|
||||
proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
|
||||
## Connects ``socket`` to server at ``address:port``.
|
||||
|
||||
@@ -1033,32 +1033,38 @@ proc newConnection(client: HttpClient | AsyncHttpClient,
|
||||
if client.currentURL.hostname != url.hostname or
|
||||
client.currentURL.scheme != url.scheme or
|
||||
client.currentURL.port != url.port:
|
||||
let isSsl = url.scheme.toLowerAscii() == "https"
|
||||
|
||||
if isSsl and not defined(ssl):
|
||||
raise newException(HttpRequestError,
|
||||
"SSL support is not available. Cannot connect over SSL.")
|
||||
|
||||
if client.connected:
|
||||
client.close()
|
||||
|
||||
when client is HttpClient:
|
||||
client.socket = newSocket()
|
||||
elif client is AsyncHttpClient:
|
||||
client.socket = newAsyncSocket()
|
||||
else: {.fatal: "Unsupported client type".}
|
||||
|
||||
# TODO: I should be able to write 'net.Port' here...
|
||||
let port =
|
||||
if url.port == "":
|
||||
if url.scheme.toLower() == "https":
|
||||
if isSsl:
|
||||
nativesockets.Port(443)
|
||||
else:
|
||||
nativesockets.Port(80)
|
||||
else: nativesockets.Port(url.port.parseInt)
|
||||
|
||||
if url.scheme.toLower() == "https":
|
||||
when defined(ssl):
|
||||
client.sslContext.wrapSocket(client.socket)
|
||||
else:
|
||||
raise newException(HttpRequestError,
|
||||
"SSL support is not available. Cannot connect over SSL.")
|
||||
when client is HttpClient:
|
||||
client.socket = await net.dial(url.hostname, port)
|
||||
elif client is AsyncHttpClient:
|
||||
client.socket = await asyncnet.dial(url.hostname, port)
|
||||
else: {.fatal: "Unsupported client type".}
|
||||
|
||||
when defined(ssl):
|
||||
if isSsl:
|
||||
try:
|
||||
client.sslContext.wrapConnectedSocket(client.socket, handshakeAsClient)
|
||||
except:
|
||||
client.socket.close()
|
||||
raise getCurrentException()
|
||||
|
||||
await client.socket.connect(url.hostname, port)
|
||||
client.currentURL = url
|
||||
client.connected = true
|
||||
|
||||
|
||||
201
lib/pure/includes/asynccommon.nim
Normal file
201
lib/pure/includes/asynccommon.nim
Normal file
@@ -0,0 +1,201 @@
|
||||
template newAsyncNativeSocketImpl(domain, sockType, protocol) =
|
||||
let handle = newNativeSocket(domain, sockType, protocol)
|
||||
if handle == osInvalidSocket:
|
||||
raiseOSError(osLastError())
|
||||
handle.setBlocking(false)
|
||||
when defined(macosx):
|
||||
handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
|
||||
result = handle.AsyncFD
|
||||
register(result)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: cint, sockType: cint,
|
||||
protocol: cint): AsyncFD =
|
||||
newAsyncNativeSocketImpl(domain, sockType, protocol)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
|
||||
sockType: SockType = SOCK_STREAM,
|
||||
protocol: Protocol = IPPROTO_TCP): AsyncFD =
|
||||
newAsyncNativeSocketImpl(domain, sockType, protocol)
|
||||
|
||||
when defined(windows) or defined(nimdoc):
|
||||
proc bindToDomain(handle: SocketHandle, domain: Domain) =
|
||||
# Extracted into a separate proc, because connect() on Windows requires
|
||||
# the socket to be initially bound.
|
||||
template doBind(saddr) =
|
||||
if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
|
||||
sizeof(saddr).SockLen) < 0'i32:
|
||||
raiseOSError(osLastError())
|
||||
|
||||
if domain == Domain.AF_INET6:
|
||||
var saddr: Sockaddr_in6
|
||||
saddr.sin6_family = int16(toInt(domain))
|
||||
doBind(saddr)
|
||||
else:
|
||||
var saddr: Sockaddr_in
|
||||
saddr.sin_family = int16(toInt(domain))
|
||||
doBind(saddr)
|
||||
|
||||
proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): Future[void] =
|
||||
let retFuture = newFuture[void]("doConnect")
|
||||
result = retFuture
|
||||
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = CompletionData(fd: socket, cb:
|
||||
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
retFuture.complete()
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
||||
)
|
||||
|
||||
let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
|
||||
cint(addrInfo.ai_addrlen), nil, 0, nil,
|
||||
cast[POVERLAPPED](ol))
|
||||
if ret:
|
||||
# Request to connect completed immediately.
|
||||
retFuture.complete()
|
||||
# We don't deallocate ``ol`` here because even though this completed
|
||||
# immediately poll will still be notified about its completion and it
|
||||
# will free ``ol``.
|
||||
else:
|
||||
let lastError = osLastError()
|
||||
if lastError.int32 != ERROR_IO_PENDING:
|
||||
# With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``,
|
||||
# and the future will be completed/failed there, too.
|
||||
GC_unref(ol)
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
else:
|
||||
proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): Future[void] =
|
||||
let retFuture = newFuture[void]("doConnect")
|
||||
result = retFuture
|
||||
|
||||
proc cb(fd: AsyncFD): bool =
|
||||
let ret = SocketHandle(fd).getSockOptInt(
|
||||
cint(SOL_SOCKET), cint(SO_ERROR))
|
||||
if ret == 0:
|
||||
# We have connected.
|
||||
retFuture.complete()
|
||||
return true
|
||||
elif ret == EINTR:
|
||||
# interrupted, keep waiting
|
||||
return false
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
|
||||
return true
|
||||
|
||||
let ret = connect(socket.SocketHandle,
|
||||
addrInfo.ai_addr,
|
||||
addrInfo.ai_addrlen.Socklen)
|
||||
if ret == 0:
|
||||
# Request to connect completed immediately.
|
||||
retFuture.complete()
|
||||
else:
|
||||
let lastError = osLastError()
|
||||
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
|
||||
addWrite(socket, cb)
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
|
||||
template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
|
||||
protocol: Protocol = IPPROTO_RAW) =
|
||||
## Iterates through the AddrInfo linked list asynchronously
|
||||
## until the connection can be established.
|
||||
const shouldCreateFd = not declared(fd)
|
||||
|
||||
when shouldCreateFd:
|
||||
let sockType = protocol.toSockType()
|
||||
|
||||
var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
|
||||
for i in low(fdPerDomain)..high(fdPerDomain):
|
||||
fdPerDomain[i] = osInvalidSocket.AsyncFD
|
||||
template closeUnusedFds(domainToKeep = -1) {.dirty.} =
|
||||
for i, fd in fdPerDomain:
|
||||
if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
|
||||
fd.closeSocket()
|
||||
|
||||
var lastException: ref Exception
|
||||
var curAddrInfo = addrInfo
|
||||
var domain: Domain
|
||||
when shouldCreateFd:
|
||||
var curFd: AsyncFD
|
||||
else:
|
||||
var curFd = fd
|
||||
proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
|
||||
if fut == nil or fut.failed:
|
||||
if fut != nil:
|
||||
lastException = fut.readError()
|
||||
|
||||
while curAddrInfo != nil:
|
||||
let domainOpt = curAddrInfo.ai_family.toKnownDomain()
|
||||
if domainOpt.isSome:
|
||||
domain = domainOpt.unsafeGet()
|
||||
break
|
||||
curAddrInfo = curAddrInfo.ai_next
|
||||
|
||||
if curAddrInfo == nil:
|
||||
freeAddrInfo(addrInfo)
|
||||
when shouldCreateFd:
|
||||
closeUnusedFds()
|
||||
if lastException != nil:
|
||||
retFuture.fail(lastException)
|
||||
else:
|
||||
retFuture.fail(newException(
|
||||
IOError, "Couldn't resolve address: " & address))
|
||||
return
|
||||
|
||||
when shouldCreateFd:
|
||||
curFd = fdPerDomain[ord(domain)]
|
||||
if curFd == osInvalidSocket.AsyncFD:
|
||||
try:
|
||||
curFd = newAsyncNativeSocket(domain, sockType, protocol)
|
||||
except:
|
||||
freeAddrInfo(addrInfo)
|
||||
closeUnusedFds()
|
||||
raise getCurrentException()
|
||||
when defined(windows):
|
||||
curFd.SocketHandle.bindToDomain(domain)
|
||||
fdPerDomain[ord(domain)] = curFd
|
||||
|
||||
doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
|
||||
curAddrInfo = curAddrInfo.ai_next
|
||||
else:
|
||||
freeAddrInfo(addrInfo)
|
||||
when shouldCreateFd:
|
||||
closeUnusedFds(ord(domain))
|
||||
retFuture.complete(curFd)
|
||||
else:
|
||||
retFuture.complete()
|
||||
|
||||
tryNextAddrInfo(nil)
|
||||
|
||||
proc dial*(address: string, port: Port,
|
||||
protocol: Protocol = IPPROTO_TCP): Future[AsyncFD] =
|
||||
## Establishes connection to the specified ``address``:``port`` pair via the
|
||||
## specified protocol. The procedure iterates through possible
|
||||
## resolutions of the ``address`` until it succeeds, meaning that it
|
||||
## seamlessly works with both IPv4 and IPv6.
|
||||
## Returns the async file descriptor, registered in the dispatcher of
|
||||
## the current thread, ready to send or receive data.
|
||||
let retFuture = newFuture[AsyncFD]("dial")
|
||||
result = retFuture
|
||||
let sockType = protocol.toSockType()
|
||||
|
||||
let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
|
||||
asyncAddrInfoLoop(aiList, noFD, protocol)
|
||||
|
||||
proc connect*(socket: AsyncFD, address: string, port: Port,
|
||||
domain = Domain.AF_INET): Future[void] =
|
||||
let retFuture = newFuture[void]("connect")
|
||||
result = retFuture
|
||||
|
||||
when defined(windows):
|
||||
verifyPresence(socket)
|
||||
else:
|
||||
assert getSockDomain(socket.SocketHandle) == domain
|
||||
|
||||
let aiList = getAddrInfo(address, port, domain)
|
||||
when defined(windows):
|
||||
socket.SocketHandle.bindToDomain(domain)
|
||||
asyncAddrInfoLoop(aiList, socket)
|
||||
@@ -12,7 +12,7 @@
|
||||
|
||||
# TODO: Clean up the exports a bit and everything else in general.
|
||||
|
||||
import os
|
||||
import os, options
|
||||
|
||||
when hostOS == "solaris":
|
||||
{.passl: "-lsocket -lnsl".}
|
||||
@@ -52,9 +52,11 @@ type
|
||||
Domain* = enum ## domain, which specifies the protocol family of the
|
||||
## created socket. Other domains than those that are listed
|
||||
## here are unsupported.
|
||||
AF_UNIX, ## for local socket (using a file). Unsupported on Windows.
|
||||
AF_UNSPEC = 0, ## unspecified domain (can be detected automatically by
|
||||
## some procedures, such as getaddrinfo)
|
||||
AF_UNIX = 1, ## for local socket (using a file). Unsupported on Windows.
|
||||
AF_INET = 2, ## for network protocol IPv4 or
|
||||
AF_INET6 = 23 ## for network protocol IPv6.
|
||||
AF_INET6 = when defined(macosx): 30 else: 23 ## for network protocol IPv6.
|
||||
|
||||
SockType* = enum ## second argument to `socket` proc
|
||||
SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets
|
||||
@@ -125,10 +127,19 @@ proc toInt*(p: Protocol): cint
|
||||
when not useWinVersion:
|
||||
proc toInt(domain: Domain): cshort =
|
||||
case domain
|
||||
of AF_UNSPEC: result = posix.AF_UNSPEC.cshort
|
||||
of AF_UNIX: result = posix.AF_UNIX.cshort
|
||||
of AF_INET: result = posix.AF_INET.cshort
|
||||
of AF_INET6: result = posix.AF_INET6.cshort
|
||||
else: discard
|
||||
|
||||
proc toKnownDomain*(family: cint): Option[Domain] =
|
||||
## Converts the platform-dependent ``cint`` to the Domain or none(),
|
||||
## if the ``cint`` is not known.
|
||||
result = if family == posix.AF_UNSPEC: some(Domain.AF_UNSPEC)
|
||||
elif family == posix.AF_UNIX: some(Domain.AF_UNIX)
|
||||
elif family == posix.AF_INET: some(Domain.AF_INET)
|
||||
elif family == posix.AF_INET6: some(Domain.AF_INET6)
|
||||
else: none(Domain)
|
||||
|
||||
proc toInt(typ: SockType): cint =
|
||||
case typ
|
||||
@@ -136,7 +147,6 @@ when not useWinVersion:
|
||||
of SOCK_DGRAM: result = posix.SOCK_DGRAM
|
||||
of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET
|
||||
of SOCK_RAW: result = posix.SOCK_RAW
|
||||
else: discard
|
||||
|
||||
proc toInt(p: Protocol): cint =
|
||||
case p
|
||||
@@ -146,18 +156,33 @@ when not useWinVersion:
|
||||
of IPPROTO_IPV6: result = posix.IPPROTO_IPV6
|
||||
of IPPROTO_RAW: result = posix.IPPROTO_RAW
|
||||
of IPPROTO_ICMP: result = posix.IPPROTO_ICMP
|
||||
else: discard
|
||||
|
||||
else:
|
||||
proc toInt(domain: Domain): cshort =
|
||||
result = toU16(ord(domain))
|
||||
|
||||
proc toKnownDomain*(family: cint): Option[Domain] =
|
||||
## Converts the platform-dependent ``cint`` to the Domain or none(),
|
||||
## if the ``cint`` is not known.
|
||||
result = if family == winlean.AF_UNSPEC: some(Domain.AF_UNSPEC)
|
||||
elif family == winlean.AF_INET: some(Domain.AF_INET)
|
||||
elif family == winlean.AF_INET6: some(Domain.AF_INET6)
|
||||
else: none(Domain)
|
||||
|
||||
proc toInt(typ: SockType): cint =
|
||||
result = cint(ord(typ))
|
||||
|
||||
proc toInt(p: Protocol): cint =
|
||||
result = cint(ord(p))
|
||||
|
||||
proc toSockType*(protocol: Protocol): SockType =
|
||||
result = case protocol
|
||||
of IPPROTO_TCP:
|
||||
SOCK_STREAM
|
||||
of IPPROTO_UDP:
|
||||
SOCK_DGRAM
|
||||
of IPPROTO_IP, IPPROTO_IPV6, IPPROTO_RAW, IPPROTO_ICMP:
|
||||
SOCK_RAW
|
||||
|
||||
proc newNativeSocket*(domain: Domain = AF_INET,
|
||||
sockType: SockType = SOCK_STREAM,
|
||||
@@ -392,14 +417,14 @@ proc getHostname*(): string {.tags: [ReadIOEffect].} =
|
||||
|
||||
proc getSockDomain*(socket: SocketHandle): Domain =
|
||||
## returns the socket's domain (AF_INET or AF_INET6).
|
||||
var name: SockAddr
|
||||
var name: Sockaddr_in6
|
||||
var namelen = sizeof(name).SockLen
|
||||
if getsockname(socket, cast[ptr SockAddr](addr(name)),
|
||||
addr(namelen)) == -1'i32:
|
||||
raiseOSError(osLastError())
|
||||
if name.sa_family == nativeAfInet:
|
||||
if name.sin6_family == nativeAfInet:
|
||||
result = AF_INET
|
||||
elif name.sa_family == nativeAfInet6:
|
||||
elif name.sin6_family == nativeAfInet6:
|
||||
result = AF_INET6
|
||||
else:
|
||||
raiseOSError(osLastError(), "unknown socket family in getSockFamily")
|
||||
@@ -410,17 +435,23 @@ proc getAddrString*(sockAddr: ptr SockAddr): string =
|
||||
if sockAddr.sa_family == nativeAfInet:
|
||||
result = $inet_ntoa(cast[ptr Sockaddr_in](sockAddr).sin_addr)
|
||||
elif sockAddr.sa_family == nativeAfInet6:
|
||||
let addrLen = when not useWinVersion: posix.INET6_ADDRSTRLEN
|
||||
else: 46 # it's actually 46 in both cases
|
||||
result = newString(addrLen)
|
||||
let addr6 = addr cast[ptr Sockaddr_in6](sockAddr).sin6_addr
|
||||
when not useWinVersion:
|
||||
# TODO: Windows
|
||||
result = newString(posix.INET6_ADDRSTRLEN)
|
||||
let addr6 = addr cast[ptr Sockaddr_in6](sockAddr).sin6_addr
|
||||
discard posix.inet_ntop(posix.AF_INET6, addr6, result.cstring,
|
||||
result.len.int32)
|
||||
if posix.inet_ntop(posix.AF_INET6, addr6, addr result[0],
|
||||
result.len.int32) == nil:
|
||||
raiseOSError(osLastError())
|
||||
if posix.IN6_IS_ADDR_V4MAPPED(addr6) != 0:
|
||||
result = result.substr("::ffff:".len)
|
||||
else:
|
||||
if winlean.inet_ntop(winlean.AF_INET6, addr6, addr result[0],
|
||||
result.len.int32) == nil:
|
||||
raiseOSError(osLastError())
|
||||
setLen(result, len(cstring(result)))
|
||||
else:
|
||||
raiseOSError(osLastError(), "unknown socket family in getAddrString")
|
||||
|
||||
raise newException(IOError, "Unknown socket family in getAddrString")
|
||||
|
||||
proc getSockName*(socket: SocketHandle): Port =
|
||||
## returns the socket's associated port number.
|
||||
|
||||
@@ -66,7 +66,7 @@
|
||||
##
|
||||
|
||||
{.deadCodeElim: on.}
|
||||
import nativesockets, os, strutils, parseutils, times, sets
|
||||
import nativesockets, os, strutils, parseutils, times, sets, options
|
||||
export Port, `$`, `==`
|
||||
export Domain, SockType, Protocol
|
||||
|
||||
@@ -669,7 +669,7 @@ proc close*(socket: Socket) =
|
||||
## Closes a socket.
|
||||
try:
|
||||
when defineSsl:
|
||||
if socket.isSSL:
|
||||
if socket.isSSL and socket.sslHandle != nil:
|
||||
ErrClearError()
|
||||
# As we are closing the underlying socket immediately afterwards,
|
||||
# it is valid, under the TLS standard, to perform a unidirectional
|
||||
@@ -1477,6 +1477,63 @@ proc isIpAddress*(address_str: string): bool {.tags: [].} =
|
||||
return false
|
||||
return true
|
||||
|
||||
proc dial*(address: string, port: Port,
|
||||
protocol = IPPROTO_TCP, buffered = true): Socket
|
||||
{.tags: [ReadIOEffect, WriteIOEffect].} =
|
||||
## Establishes connection to the specified ``address``:``port`` pair via the
|
||||
## specified protocol. The procedure iterates through possible
|
||||
## resolutions of the ``address`` until it succeeds, meaning that it
|
||||
## seamlessly works with both IPv4 and IPv6.
|
||||
## Returns Socket ready to send or receive data.
|
||||
let sockType = protocol.toSockType()
|
||||
|
||||
let aiList = getAddrInfo(address, port, AF_UNSPEC, sockType, protocol)
|
||||
|
||||
var fdPerDomain: array[low(Domain).ord..high(Domain).ord, SocketHandle]
|
||||
for i in low(fdPerDomain)..high(fdPerDomain):
|
||||
fdPerDomain[i] = osInvalidSocket
|
||||
template closeUnusedFds(domainToKeep = -1) {.dirty.} =
|
||||
for i, fd in fdPerDomain:
|
||||
if fd != osInvalidSocket and i != domainToKeep:
|
||||
fd.close()
|
||||
|
||||
var success = false
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
var domain: Domain
|
||||
var lastFd: SocketHandle
|
||||
while it != nil:
|
||||
let domainOpt = it.ai_family.toKnownDomain()
|
||||
if domainOpt.isNone:
|
||||
it = it.ai_next
|
||||
continue
|
||||
domain = domainOpt.unsafeGet()
|
||||
lastFd = fdPerDomain[ord(domain)]
|
||||
if lastFd == osInvalidSocket:
|
||||
lastFd = newNativeSocket(domain, sockType, protocol)
|
||||
if lastFd == osInvalidSocket:
|
||||
# we always raise if socket creation failed, because it means a
|
||||
# network system problem (e.g. not enough FDs), and not an unreachable
|
||||
# address.
|
||||
let err = osLastError()
|
||||
freeAddrInfo(aiList)
|
||||
closeUnusedFds()
|
||||
raiseOSError(err)
|
||||
fdPerDomain[ord(domain)] = lastFd
|
||||
if connect(lastFd, it.ai_addr, it.ai_addrlen.SockLen) == 0'i32:
|
||||
success = true
|
||||
break
|
||||
lastError = osLastError()
|
||||
it = it.ai_next
|
||||
freeAddrInfo(aiList)
|
||||
closeUnusedFds(ord(domain))
|
||||
|
||||
if success:
|
||||
result = newSocket(lastFd, domain, sockType, protocol)
|
||||
elif lastError != 0.OSErrorCode:
|
||||
raiseOSError(lastError)
|
||||
else:
|
||||
raise newException(IOError, "Couldn't resolve address: " & address)
|
||||
|
||||
proc connect*(socket: Socket, address: string,
|
||||
port = Port(0)) {.tags: [ReadIOEffect].} =
|
||||
|
||||
@@ -50,6 +50,7 @@ proc add*(url: var Url, a: Url) {.deprecated.} =
|
||||
proc parseAuthority(authority: string, result: var Uri) =
|
||||
var i = 0
|
||||
var inPort = false
|
||||
var inIPv6 = false
|
||||
while true:
|
||||
case authority[i]
|
||||
of '@':
|
||||
@@ -59,7 +60,14 @@ proc parseAuthority(authority: string, result: var Uri) =
|
||||
result.hostname.setLen(0)
|
||||
inPort = false
|
||||
of ':':
|
||||
inPort = true
|
||||
if inIPv6:
|
||||
result.hostname.add(authority[i])
|
||||
else:
|
||||
inPort = true
|
||||
of '[':
|
||||
inIPv6 = true
|
||||
of ']':
|
||||
inIPv6 = false
|
||||
of '\0': break
|
||||
else:
|
||||
if inPort:
|
||||
@@ -345,6 +353,17 @@ when isMainModule:
|
||||
doAssert test.anchor == "nose"
|
||||
doAssert($test == str)
|
||||
|
||||
block:
|
||||
# IPv6 address
|
||||
let str = "foo://[::1]:1234/bar?baz=true&qux#quux"
|
||||
let uri = parseUri(str)
|
||||
doAssert uri.scheme == "foo"
|
||||
doAssert uri.hostname == "::1"
|
||||
doAssert uri.port == "1234"
|
||||
doAssert uri.path == "/bar"
|
||||
doAssert uri.query == "baz=true&qux"
|
||||
doAssert uri.anchor == "quux"
|
||||
|
||||
block:
|
||||
let str = "urn:example:animal:ferret:nose"
|
||||
let test = parseUri(str)
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
include "system/inclrtl"
|
||||
|
||||
import os, oids, tables, strutils, times, heapqueue, lists
|
||||
import os, oids, tables, strutils, times, heapqueue, lists, options
|
||||
|
||||
import nativesockets, net, deques
|
||||
|
||||
@@ -325,68 +325,6 @@ when defined(windows) or defined(nimdoc):
|
||||
getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
|
||||
close(dummySock)
|
||||
|
||||
proc connect*(socket: AsyncFD, address: string, port: Port,
|
||||
domain = nativesockets.AF_INET): Future[void] =
|
||||
## Connects ``socket`` to server at ``address:port``.
|
||||
##
|
||||
## Returns a ``Future`` which will complete when the connection succeeds
|
||||
## or an error occurs.
|
||||
verifyPresence(socket)
|
||||
var retFuture = newFuture[void]("connect")
|
||||
# Apparently ``ConnectEx`` expects the socket to be initially bound:
|
||||
var saddr: Sockaddr_in
|
||||
saddr.sin_family = int16(toInt(domain))
|
||||
saddr.sin_port = 0
|
||||
saddr.sin_addr.s_addr = INADDR_ANY
|
||||
if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
|
||||
sizeof(saddr).SockLen) < 0'i32:
|
||||
raiseOSError(osLastError())
|
||||
|
||||
var aiList = getAddrInfo(address, port, domain)
|
||||
var success = false
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
# "the OVERLAPPED structure must remain valid until the I/O completes"
|
||||
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = CompletionData(fd: socket, cb:
|
||||
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
retFuture.complete()
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
|
||||
)
|
||||
|
||||
var ret = connectEx(socket.SocketHandle, it.ai_addr,
|
||||
sizeof(Sockaddr_in).cint, nil, 0, nil,
|
||||
cast[POVERLAPPED](ol))
|
||||
if ret:
|
||||
# Request to connect completed immediately.
|
||||
success = true
|
||||
retFuture.complete()
|
||||
# We don't deallocate ``ol`` here because even though this completed
|
||||
# immediately poll will still be notified about its completion and it will
|
||||
# free ``ol``.
|
||||
break
|
||||
else:
|
||||
lastError = osLastError()
|
||||
if lastError.int32 == ERROR_IO_PENDING:
|
||||
# In this case ``ol`` will be deallocated in ``poll``.
|
||||
success = true
|
||||
break
|
||||
else:
|
||||
GC_unref(ol)
|
||||
success = false
|
||||
it = it.ai_next
|
||||
|
||||
freeAddrInfo(aiList)
|
||||
if not success:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
return retFuture
|
||||
|
||||
proc recv*(socket: AsyncFD, size: int,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] =
|
||||
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
|
||||
@@ -739,8 +677,8 @@ when defined(windows) or defined(nimdoc):
|
||||
var lpOutputBuf = newString(lpOutputLen)
|
||||
var dwBytesReceived: Dword
|
||||
let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
|
||||
let dwLocalAddressLength = Dword(sizeof(Sockaddr_in) + 16)
|
||||
let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16)
|
||||
let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
|
||||
let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
|
||||
|
||||
template failAccept(errcode) =
|
||||
if flags.isDisconnectionError(errcode):
|
||||
@@ -770,12 +708,14 @@ when defined(windows) or defined(nimdoc):
|
||||
dwLocalAddressLength, dwRemoteAddressLength,
|
||||
addr localSockaddr, addr localLen,
|
||||
addr remoteSockaddr, addr remoteLen)
|
||||
register(clientSock.AsyncFD)
|
||||
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
|
||||
retFuture.complete(
|
||||
(address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
|
||||
client: clientSock.AsyncFD)
|
||||
)
|
||||
try:
|
||||
let address = getAddrString(remoteSockAddr)
|
||||
register(clientSock.AsyncFD)
|
||||
retFuture.complete((address: address, client: clientSock.AsyncFD))
|
||||
except:
|
||||
# getAddrString may raise
|
||||
clientSock.close()
|
||||
retFuture.fail(getCurrentException())
|
||||
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
@@ -808,20 +748,6 @@ when defined(windows) or defined(nimdoc):
|
||||
|
||||
return retFuture
|
||||
|
||||
proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD =
|
||||
## Creates a new socket and registers it with the dispatcher implicitly.
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
register(result)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET,
|
||||
sockType: SockType = SOCK_STREAM,
|
||||
protocol: Protocol = IPPROTO_TCP): AsyncFD =
|
||||
## Creates a new socket and registers it with the dispatcher implicitly.
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
register(result)
|
||||
|
||||
proc closeSocket*(socket: AsyncFD) =
|
||||
## Closes a socket and ensures that it is unregistered.
|
||||
socket.SocketHandle.close()
|
||||
@@ -1159,23 +1085,6 @@ else:
|
||||
var data = newAsyncData()
|
||||
p.selector.registerHandle(fd.SocketHandle, {}, data)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: cint, sockType: cint,
|
||||
protocol: cint): AsyncFD =
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
when defined(macosx):
|
||||
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
|
||||
register(result)
|
||||
|
||||
proc newAsyncNativeSocket*(domain: Domain = AF_INET,
|
||||
sockType: SockType = SOCK_STREAM,
|
||||
protocol: Protocol = IPPROTO_TCP): AsyncFD =
|
||||
result = newNativeSocket(domain, sockType, protocol).AsyncFD
|
||||
result.SocketHandle.setBlocking(false)
|
||||
when defined(macosx):
|
||||
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
|
||||
register(result)
|
||||
|
||||
proc closeSocket*(sock: AsyncFD) =
|
||||
let disp = getGlobalDispatcher()
|
||||
disp.selector.unregister(sock.SocketHandle)
|
||||
@@ -1331,50 +1240,6 @@ else:
|
||||
# Callback queue processing
|
||||
processPendingCallbacks(p)
|
||||
|
||||
proc connect*(socket: AsyncFD, address: string, port: Port,
|
||||
domain = AF_INET): Future[void] =
|
||||
var retFuture = newFuture[void]("connect")
|
||||
|
||||
proc cb(fd: AsyncFD): bool =
|
||||
var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
|
||||
if ret == 0:
|
||||
# We have connected.
|
||||
retFuture.complete()
|
||||
return true
|
||||
elif ret == EINTR:
|
||||
# interrupted, keep waiting
|
||||
return false
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
|
||||
return true
|
||||
|
||||
assert getSockDomain(socket.SocketHandle) == domain
|
||||
var aiList = getAddrInfo(address, port, domain)
|
||||
var success = false
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
|
||||
if ret == 0:
|
||||
# Request to connect completed immediately.
|
||||
success = true
|
||||
retFuture.complete()
|
||||
break
|
||||
else:
|
||||
lastError = osLastError()
|
||||
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
|
||||
success = true
|
||||
addWrite(socket, cb)
|
||||
break
|
||||
else:
|
||||
success = false
|
||||
it = it.ai_next
|
||||
|
||||
freeAddrInfo(aiList)
|
||||
if not success:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
return retFuture
|
||||
|
||||
proc recv*(socket: AsyncFD, size: int,
|
||||
flags = {SocketFlag.SafeDisconn}): Future[string] =
|
||||
var retFuture = newFuture[string]("recv")
|
||||
@@ -1568,9 +1433,14 @@ else:
|
||||
else:
|
||||
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
|
||||
else:
|
||||
register(client.AsyncFD)
|
||||
retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)),
|
||||
client.AsyncFD))
|
||||
try:
|
||||
let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
|
||||
register(client.AsyncFD)
|
||||
retFuture.complete((address, client.AsyncFD))
|
||||
except:
|
||||
# getAddrString may raise
|
||||
client.close()
|
||||
retFuture.fail(getCurrentException())
|
||||
addRead(socket, cb)
|
||||
return retFuture
|
||||
|
||||
@@ -1623,6 +1493,9 @@ else:
|
||||
data.readList.add(cb)
|
||||
p.selector.registerEvent(SelectEvent(ev), data)
|
||||
|
||||
# Common procedures between current and upcoming asyncdispatch
|
||||
include includes.asynccommon
|
||||
|
||||
proc sleepAsync*(ms: int): Future[void] =
|
||||
## Suspends the execution of the current async procedure for the next
|
||||
## ``ms`` milliseconds.
|
||||
|
||||
@@ -495,7 +495,7 @@ type
|
||||
ai_family*: cint ## Address family of socket.
|
||||
ai_socktype*: cint ## Socket type.
|
||||
ai_protocol*: cint ## Protocol of socket.
|
||||
ai_addrlen*: int ## Length of socket address.
|
||||
ai_addrlen*: csize ## Length of socket address.
|
||||
ai_canonname*: cstring ## Canonical name of service location.
|
||||
ai_addr*: ptr SockAddr ## Socket address of socket.
|
||||
ai_next*: ptr AddrInfo ## Pointer to next in list.
|
||||
@@ -803,6 +803,7 @@ const
|
||||
SIO_GET_EXTENSION_FUNCTION_POINTER* = WSAIORW(IOC_WS2,6).DWORD
|
||||
SO_UPDATE_ACCEPT_CONTEXT* = 0x700B
|
||||
AI_V4MAPPED* = 0x0008
|
||||
AF_UNSPEC* = 0
|
||||
AF_INET* = 2
|
||||
AF_INET6* = 23
|
||||
|
||||
|
||||
53
tests/async/tasyncdial.nim
Normal file
53
tests/async/tasyncdial.nim
Normal file
@@ -0,0 +1,53 @@
|
||||
discard """
|
||||
file: "tasyncdial.nim"
|
||||
output: '''
|
||||
OK AF_INET
|
||||
OK AF_INET6
|
||||
'''
|
||||
"""
|
||||
|
||||
import
|
||||
nativesockets, os, asyncdispatch
|
||||
|
||||
proc setupServerSocket(hostname: string, port: Port, domain: Domain): AsyncFD =
|
||||
## Creates a socket, binds it to the specified address, and starts listening for connecitons.
|
||||
## Registers the descriptor with the dispatcher of the current thread
|
||||
## Raises OSError in case of an error.
|
||||
let fd = newNativeSocket(domain)
|
||||
setSockOptInt(fd, SOL_SOCKET, SO_REUSEADDR, 1)
|
||||
var aiList = getAddrInfo(hostname, port, domain)
|
||||
if bindAddr(fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
|
||||
freeAddrInfo(aiList)
|
||||
raiseOSError(osLastError())
|
||||
freeAddrInfo(aiList)
|
||||
if listen(fd) != 0:
|
||||
raiseOSError(osLastError())
|
||||
setBlocking(fd, false)
|
||||
result = fd.AsyncFD
|
||||
register(result)
|
||||
|
||||
proc doTest(domain: static[Domain]) {.async.} =
|
||||
const
|
||||
testHost = when domain == Domain.AF_INET6: "::1" else: "127.0.0.1"
|
||||
testPort = Port(17384)
|
||||
let serverFd = setupServerSocket(testHost, testPort, domain)
|
||||
let acceptFut = serverFd.accept()
|
||||
let clientFdFut = dial(testHost, testPort)
|
||||
|
||||
let serverClientFd = await acceptFut
|
||||
serverFd.closeSocket()
|
||||
|
||||
let clientFd = await clientFdFut
|
||||
|
||||
let recvFut = serverClientFd.recv(2)
|
||||
await clientFd.send("Hi")
|
||||
let msg = await recvFut
|
||||
|
||||
serverClientFd.closeSocket()
|
||||
clientFd.closeSocket()
|
||||
|
||||
if msg == "Hi":
|
||||
echo "OK ", domain
|
||||
|
||||
waitFor(doTest(Domain.AF_INET))
|
||||
waitFor(doTest(Domain.AF_INET6))
|
||||
@@ -1,11 +1,13 @@
|
||||
discard """
|
||||
cmd: "nim c --threads:on -d:ssl $file"
|
||||
exitcode: 0
|
||||
output: "OK"
|
||||
"""
|
||||
|
||||
import strutils
|
||||
from net import TimeoutError
|
||||
|
||||
import httpclient, asyncdispatch
|
||||
import nativesockets, os, httpclient, asyncdispatch
|
||||
|
||||
const manualTests = false
|
||||
|
||||
@@ -112,6 +114,40 @@ proc syncTest() =
|
||||
except:
|
||||
doAssert false, "TimeoutError should have been raised."
|
||||
|
||||
syncTest()
|
||||
proc makeIPv6HttpServer(hostname: string, port: Port): AsyncFD =
|
||||
let fd = newNativeSocket(AF_INET6)
|
||||
setSockOptInt(fd, SOL_SOCKET, SO_REUSEADDR, 1)
|
||||
var aiList = getAddrInfo(hostname, port, AF_INET6)
|
||||
if bindAddr(fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
|
||||
freeAddrInfo(aiList)
|
||||
raiseOSError(osLastError())
|
||||
freeAddrInfo(aiList)
|
||||
if listen(fd) != 0:
|
||||
raiseOSError(osLastError())
|
||||
setBlocking(fd, false)
|
||||
|
||||
var serverFd = fd.AsyncFD
|
||||
register(serverFd)
|
||||
result = serverFd
|
||||
|
||||
proc onAccept(fut: Future[AsyncFD]) {.gcsafe.} =
|
||||
if not fut.failed:
|
||||
let clientFd = fut.read()
|
||||
clientFd.send("HTTP/1.1 200 OK\r\LContent-Length: 0\r\LConnection: Closed\r\L\r\L").callback = proc() =
|
||||
clientFd.closeSocket()
|
||||
serverFd.accept().callback = onAccept
|
||||
serverFd.accept().callback = onAccept
|
||||
|
||||
proc ipv6Test() =
|
||||
var client = newAsyncHttpClient()
|
||||
let serverFd = makeIPv6HttpServer("::1", Port(18473))
|
||||
var resp = waitFor client.request("http://[::1]:18473/")
|
||||
doAssert(resp.status == "200 OK")
|
||||
serverFd.closeSocket()
|
||||
client.close()
|
||||
|
||||
syncTest()
|
||||
waitFor(asyncTest())
|
||||
ipv6Test()
|
||||
|
||||
echo "OK"
|
||||
|
||||
60
tests/stdlib/tnetdial.nim
Normal file
60
tests/stdlib/tnetdial.nim
Normal file
@@ -0,0 +1,60 @@
|
||||
discard """
|
||||
cmd: "nim c --threads:on $file"
|
||||
exitcode: 0
|
||||
output: "OK"
|
||||
"""
|
||||
|
||||
import os, net, nativesockets, asyncdispatch
|
||||
|
||||
## Test for net.dial
|
||||
|
||||
const port = Port(28431)
|
||||
|
||||
proc initIPv6Server(hostname: string, port: Port): AsyncFD =
|
||||
let fd = newNativeSocket(AF_INET6)
|
||||
setSockOptInt(fd, SOL_SOCKET, SO_REUSEADDR, 1)
|
||||
var aiList = getAddrInfo(hostname, port, AF_INET6)
|
||||
if bindAddr(fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
|
||||
freeAddrInfo(aiList)
|
||||
raiseOSError(osLastError())
|
||||
freeAddrInfo(aiList)
|
||||
if listen(fd) != 0:
|
||||
raiseOSError(osLastError())
|
||||
setBlocking(fd, false)
|
||||
|
||||
var serverFd = fd.AsyncFD
|
||||
register(serverFd)
|
||||
result = serverFd
|
||||
|
||||
# Since net.dial is synchronous, we use main thread to setup server,
|
||||
# and dial to it from another thread.
|
||||
|
||||
proc testThread() {.thread.} =
|
||||
let fd = net.dial("::1", port)
|
||||
var s = newString(5)
|
||||
doAssert fd.recv(addr s[0], 5) == 5
|
||||
if s == "Hello":
|
||||
echo "OK"
|
||||
fd.close()
|
||||
|
||||
proc test() =
|
||||
var t: Thread[void]
|
||||
createThread(t, testThread)
|
||||
|
||||
let serverFd = initIPv6Server("::1", port)
|
||||
var done = false
|
||||
|
||||
serverFd.accept().callback = proc(fut: Future[AsyncFD]) =
|
||||
serverFd.closeSocket()
|
||||
if not fut.failed:
|
||||
let fd = fut.read()
|
||||
fd.send("Hello").callback = proc() =
|
||||
fd.closeSocket()
|
||||
done = true
|
||||
|
||||
while not done:
|
||||
poll()
|
||||
|
||||
joinThread(t)
|
||||
|
||||
test()
|
||||
@@ -66,7 +66,10 @@ Library Additions
|
||||
-----------------
|
||||
|
||||
- Added ``system.onThreadDestruction``.
|
||||
|
||||
- Added ``dial`` procedure to networking modules: ``net``, ``asyncdispatch``,
|
||||
``asyncnet``. It merges socket creation, address resolution, and connection
|
||||
into single step. When using ``dial``, you don't have to worry about
|
||||
IPv4 vs IPv6 problem. ``httpclient`` now supports IPv6.
|
||||
|
||||
Tool Additions
|
||||
--------------
|
||||
|
||||
Reference in New Issue
Block a user