mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-28 17:04:41 +00:00
fix sendTo and recvFrom in asyncnet (#14154)
* added high level sendTo and recvFrom to std/asyncnet; tests were also added. * add .since annotation, a changelog entry and fixed to standard library style guide. * Improved asserts msgs and added notes for use with UDP sockets * pointers removed in parameters and fixes * added .since annotation * minor fixes
This commit is contained in:
@@ -780,8 +780,8 @@ proc isClosed*(socket: AsyncSocket): bool =
|
||||
## Determines whether the socket has been closed.
|
||||
return socket.closed
|
||||
|
||||
proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string,
|
||||
port: Port, flags = {SocketFlag.SafeDisconn}): owned(Future[void])
|
||||
proc sendTo*(socket: AsyncSocket, address: string, port: Port, data: string,
|
||||
flags = {SocketFlag.SafeDisconn}): owned(Future[void])
|
||||
{.async, since: (1, 3).} =
|
||||
## This proc sends ``data`` to the specified ``address``, which may be an IP
|
||||
## address or a hostname. If a hostname is specified this function will try
|
||||
@@ -792,13 +792,11 @@ proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string,
|
||||
##
|
||||
## This proc is normally used with connectionless sockets (UDP sockets).
|
||||
assert(socket.protocol != IPPROTO_TCP,
|
||||
"Cannot `sendTo` on a TCP socket. Use `send` instead.")
|
||||
assert(not socket.closed, "Cannot `sendTo` on a closed socket.")
|
||||
"Cannot `sendTo` on a TCP socket. Use `send` instead")
|
||||
assert(not socket.closed, "Cannot `sendTo` on a closed socket")
|
||||
|
||||
let
|
||||
aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
|
||||
socket.protocol)
|
||||
retFuture = newFuture[void]("sendTo")
|
||||
let aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
|
||||
socket.protocol)
|
||||
|
||||
var
|
||||
it = aiList
|
||||
@@ -806,7 +804,7 @@ proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string,
|
||||
lastException: ref Exception
|
||||
|
||||
while it != nil:
|
||||
let fut = sendTo(socket.fd.AsyncFD, data, dataSize, it.ai_addr,
|
||||
let fut = sendTo(socket.fd.AsyncFD, cstring(data), len(data), it.ai_addr,
|
||||
it.ai_addrlen.SockLen, flags)
|
||||
|
||||
yield fut
|
||||
@@ -824,88 +822,54 @@ proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string,
|
||||
|
||||
if not success:
|
||||
if lastException != nil:
|
||||
retFuture.fail(lastException)
|
||||
|
||||
raise lastException
|
||||
else:
|
||||
retFuture.fail(newException(
|
||||
IOError, "Couldn't resolve address: " & address))
|
||||
raise newException(IOError, "Couldn't resolve address: " & address)
|
||||
|
||||
else:
|
||||
retFuture.complete()
|
||||
|
||||
proc sendTo*(socket: AsyncSocket, data, address: string, port: Port):
|
||||
owned(Future[void]) {.async, since: (1, 3).} =
|
||||
## This proc sends ``data`` to the specified ``address``, which may be an IP
|
||||
## address or a hostname. If a hostname is specified this function will try
|
||||
## each IP of that hostname. The returned future will complete once all data
|
||||
## has been sent.
|
||||
##
|
||||
## If an error occurs an OSError exception will be raised.
|
||||
##
|
||||
## This proc is normally used with connectionless sockets (UDP sockets).
|
||||
await sendTo(socket, cstring(data), len(data), address, port)
|
||||
|
||||
proc recvFrom*(socket: AsyncSocket, data: pointer, size: int,
|
||||
address: FutureVar[string], port: FutureVar[Port],
|
||||
flags = {SocketFlag.SafeDisconn}): owned(Future[int])
|
||||
proc recvFrom*(socket: AsyncSocket, size: int,
|
||||
flags = {SocketFlag.SafeDisconn}):
|
||||
owned(Future[tuple[data: string, address: string, port: Port]])
|
||||
{.async, since: (1, 3).} =
|
||||
## Receives a datagram data from ``socket`` into ``data``, which must be at
|
||||
## least of size ``size``. The address and port of datagram's sender will be
|
||||
## stored into ``address`` and ``port``, respectively. Returned future will
|
||||
## complete once one datagram has been received, and will return size of
|
||||
## packet received.
|
||||
## Receives a datagram data from ``socket``, which must be at least of size
|
||||
## ``size``. Returned future will complete once one datagram has been received
|
||||
## and will return tuple with: data of packet received; and address and port
|
||||
## of datagram's sender.
|
||||
##
|
||||
## If an error occurs an OSError exception will be raised.
|
||||
##
|
||||
## This proc is normally used with connectionless sockets (UDP sockets).
|
||||
template awaitRecvFromInto() =
|
||||
template adaptRecvFromToDomain(domain: Domain) =
|
||||
var lAddr = sizeof(sAddr).SockLen
|
||||
|
||||
result = await recvFromInto(AsyncFD(getFd(socket)), data, size,
|
||||
cast[ptr SockAddr](addr sAddr), addr lAddr)
|
||||
|
||||
address.mget.add(getAddrString(cast[ptr SockAddr](addr sAddr)))
|
||||
|
||||
address.complete()
|
||||
let fut = await recvFromInto(AsyncFD(getFd(socket)), cstring(data), size,
|
||||
cast[ptr SockAddr](addr sAddr), addr lAddr,
|
||||
flags)
|
||||
|
||||
data.setLen(fut)
|
||||
|
||||
result.data = data
|
||||
result.address = getAddrString(cast[ptr SockAddr](addr sAddr))
|
||||
|
||||
when domain == AF_INET6:
|
||||
result.port = ntohs(sAddr.sin6_port).Port
|
||||
else:
|
||||
result.port = ntohs(sAddr.sin_port).Port
|
||||
|
||||
assert(socket.protocol != IPPROTO_TCP,
|
||||
"Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead.")
|
||||
assert(not socket.closed, "Cannot `recvFrom` on a closed socket.")
|
||||
"Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead")
|
||||
assert(not socket.closed, "Cannot `recvFrom` on a closed socket")
|
||||
|
||||
var readSize: int
|
||||
|
||||
if socket.domain == AF_INET6:
|
||||
var sAddr: Sockaddr_in6
|
||||
|
||||
awaitRecvFromInto()
|
||||
|
||||
port.complete(ntohs(sAddr.sin6_port).Port)
|
||||
|
||||
else:
|
||||
var sAddr: Sockaddr_in
|
||||
|
||||
awaitRecvFromInto()
|
||||
|
||||
port.complete(ntohs(sAddr.sin_port).Port)
|
||||
|
||||
proc recvFrom*(socket: AsyncSocket, data: pointer, size: int):
|
||||
owned(Future[tuple[size: int, address: string, port: Port]])
|
||||
{.async, since: (1, 3).} =
|
||||
## Receives a datagram data from ``socket`` into ``data``, which must be at
|
||||
## least of size ``size``. Returned future will complete once one datagram has
|
||||
## been received and will return tuple with: size of packet received; and
|
||||
## address and port of datagram's sender.
|
||||
##
|
||||
## If an error occurs an OSError exception will be raised.
|
||||
##
|
||||
## This proc is normally used with connectionless sockets (UDP sockets).
|
||||
var
|
||||
fromIp = newFutureVar[string]()
|
||||
fromPort = newFutureVar[Port]()
|
||||
var data = newString(size)
|
||||
|
||||
result.size = await recvFrom(socket, data, size, fromIp, fromPort)
|
||||
result.address = fromIp.mget()
|
||||
result.port = fromPort.mget()
|
||||
case socket.domain
|
||||
of AF_INET6:
|
||||
var sAddr: Sockaddr_in6
|
||||
adaptRecvFromToDomain(AF_INET6)
|
||||
of AF_INET:
|
||||
var sAddr: Sockaddr_in
|
||||
adaptRecvFromToDomain(AF_INET)
|
||||
else:
|
||||
raise newException(ValueError, "Unknown socket address family")
|
||||
|
||||
when not defined(testing) and isMainModule:
|
||||
type
|
||||
|
||||
@@ -21,9 +21,7 @@ proc saveReceivedPort(port: Port) =
|
||||
recvports = recvports + int(port)
|
||||
|
||||
proc launchSwarm(serverIp: string, serverPort: Port) {.async.} =
|
||||
var
|
||||
buffer = newString(16384)
|
||||
i = 0
|
||||
var i = 0
|
||||
|
||||
while i < swarmSize:
|
||||
var sock = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM,
|
||||
@@ -36,16 +34,13 @@ proc launchSwarm(serverIp: string, serverPort: Port) {.async.} =
|
||||
var k = 0
|
||||
|
||||
while k < messagesToSend:
|
||||
zeroMem(addr(buffer[0]), 16384)
|
||||
|
||||
let message = "Message " & $(i * messagesToSend + k)
|
||||
|
||||
await sendTo(sock, message, serverIp, serverPort)
|
||||
await asyncnet.sendTo(sock, serverIp, serverPort, message)
|
||||
|
||||
let (size, fromIp, fromPort) = await recvFrom(sock, addr buffer[0],
|
||||
16384)
|
||||
let (data, fromIp, fromPort) = await recvFrom(sock, 16384)
|
||||
|
||||
if buffer[0 .. (size - 1)] == message:
|
||||
if data == message:
|
||||
saveSendingPort(localPort)
|
||||
|
||||
inc(recvCount)
|
||||
@@ -59,17 +54,13 @@ proc launchSwarm(serverIp: string, serverPort: Port) {.async.} =
|
||||
proc readMessages(server: AsyncSocket) {.async.} =
|
||||
let maxResponses = (swarmSize * messagesToSend)
|
||||
|
||||
var
|
||||
buffer = newString(16384)
|
||||
i = 0
|
||||
var i = 0
|
||||
|
||||
while i < maxResponses:
|
||||
zeroMem(addr(buffer[0]), 16384)
|
||||
|
||||
let (size, fromIp, fromPort) = await recvFrom(server, addr buffer[0], 16384)
|
||||
let (data, fromIp, fromPort) = await recvFrom(server, 16384)
|
||||
|
||||
if buffer.startswith("Message ") and fromIp == "127.0.0.1":
|
||||
await sendTo(server, buffer[0 .. (size - 1)], fromIp, fromPort)
|
||||
if data.startswith("Message ") and fromIp == "127.0.0.1":
|
||||
await sendTo(server, fromIp, fromPort, data)
|
||||
|
||||
inc(msgCount)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user