added high level sendTo and recvFrom to std/asyncnet (UDP functionality) (#14109)

* added high level sendTo and recvFrom to std/asyncnet; tests were also added.

* add .since annotation, a changelog entry and fixed to standard library style guide.

* Improved asserts msgs and added notes for use with UDP sockets
This commit is contained in:
rockcavera
2020-04-26 05:16:10 -03:00
committed by GitHub
parent 76ffa4fa25
commit d23446c6ba
3 changed files with 228 additions and 0 deletions

View File

@@ -41,6 +41,8 @@
accept an existing string to modify, which avoids memory
allocations, similar to `streams.readLine` (#13857).
- Added high-level `asyncnet.sendTo` and `asyncnet.recvFrom`. UDP functionality.
## Language changes
- In newruntime it is now allowed to assign discriminator field without restrictions as long as case object doesn't have custom destructor. Discriminator value doesn't have to be a constant either. If you have custom destructor for case object and you do want to freely assign discriminator fields, it is recommended to refactor object into 2 objects like this:
```nim

View File

@@ -780,6 +780,133 @@ proc isClosed*(socket: AsyncSocket): bool =
## Determines whether the socket has been closed.
return socket.closed
proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string,
port: Port, flags = {SocketFlag.SafeDisconn}): owned(Future[void])
{.async, since: (1, 3).} =
## This proc sends ``data`` to the specified ``address``, which may be an IP
## address or a hostname. If a hostname is specified this function will try
## each IP of that hostname. The returned future will complete once all data
## has been sent.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
assert(socket.protocol != IPPROTO_TCP,
"Cannot `sendTo` on a TCP socket. Use `send` instead.")
assert(not socket.closed, "Cannot `sendTo` on a closed socket.")
let
aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
socket.protocol)
retFuture = newFuture[void]("sendTo")
var
it = aiList
success = false
lastException: ref Exception
while it != nil:
let fut = sendTo(socket.fd.AsyncFD, data, dataSize, it.ai_addr,
it.ai_addrlen.SockLen, flags)
yield fut
if not fut.failed:
success = true
break
lastException = fut.readError()
it = it.ai_next
freeaddrinfo(aiList)
if not success:
if lastException != nil:
retFuture.fail(lastException)
else:
retFuture.fail(newException(
IOError, "Couldn't resolve address: " & address))
else:
retFuture.complete()
proc sendTo*(socket: AsyncSocket, data, address: string, port: Port):
owned(Future[void]) {.async, since: (1, 3).} =
## This proc sends ``data`` to the specified ``address``, which may be an IP
## address or a hostname. If a hostname is specified this function will try
## each IP of that hostname. The returned future will complete once all data
## has been sent.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
await sendTo(socket, cstring(data), len(data), address, port)
proc recvFrom*(socket: AsyncSocket, data: pointer, size: int,
address: FutureVar[string], port: FutureVar[Port],
flags = {SocketFlag.SafeDisconn}): owned(Future[int])
{.async, since: (1, 3).} =
## Receives a datagram data from ``socket`` into ``data``, which must be at
## least of size ``size``. The address and port of datagram's sender will be
## stored into ``address`` and ``port``, respectively. Returned future will
## complete once one datagram has been received, and will return size of
## packet received.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
template awaitRecvFromInto() =
var lAddr = sizeof(sAddr).SockLen
result = await recvFromInto(AsyncFD(getFd(socket)), data, size,
cast[ptr SockAddr](addr sAddr), addr lAddr)
address.mget.add(getAddrString(cast[ptr SockAddr](addr sAddr)))
address.complete()
assert(socket.protocol != IPPROTO_TCP,
"Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead.")
assert(not socket.closed, "Cannot `recvFrom` on a closed socket.")
var readSize: int
if socket.domain == AF_INET6:
var sAddr: Sockaddr_in6
awaitRecvFromInto()
port.complete(ntohs(sAddr.sin6_port).Port)
else:
var sAddr: Sockaddr_in
awaitRecvFromInto()
port.complete(ntohs(sAddr.sin_port).Port)
proc recvFrom*(socket: AsyncSocket, data: pointer, size: int):
owned(Future[tuple[size: int, address: string, port: Port]])
{.async, since: (1, 3).} =
## Receives a datagram data from ``socket`` into ``data``, which must be at
## least of size ``size``. Returned future will complete once one datagram has
## been received and will return tuple with: size of packet received; and
## address and port of datagram's sender.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
var
fromIp = newFutureVar[string]()
fromPort = newFutureVar[Port]()
result.size = await recvFrom(socket, data, size, fromIp, fromPort)
result.address = fromIp.mget()
result.port = fromPort.mget()
when not defined(testing) and isMainModule:
type
TestCases = enum

View File

@@ -0,0 +1,99 @@
# It is a reproduction of the 'tnewasyncudp' test code, but using a high level
# of asynchronous procedures. Output: "5000"
import asyncdispatch, asyncnet, nativesockets, net, strutils
var msgCount = 0
var recvCount = 0
const
messagesToSend = 100
swarmSize = 50
serverPort = 10333
var
sendports = 0
recvports = 0
proc saveSendingPort(port: Port) =
sendports = sendports + int(port)
proc saveReceivedPort(port: Port) =
recvports = recvports + int(port)
proc launchSwarm(serverIp: string, serverPort: Port) {.async.} =
var
buffer = newString(16384)
i = 0
while i < swarmSize:
var sock = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM,
Protocol.IPPROTO_UDP, false)
bindAddr(sock, address = "127.0.0.1")
let (null, localPort) = getLocalAddr(sock)
var k = 0
while k < messagesToSend:
zeroMem(addr(buffer[0]), 16384)
let message = "Message " & $(i * messagesToSend + k)
await sendTo(sock, message, serverIp, serverPort)
let (size, fromIp, fromPort) = await recvFrom(sock, addr buffer[0],
16384)
if buffer[0 .. (size - 1)] == message:
saveSendingPort(localPort)
inc(recvCount)
inc(k)
close(sock)
inc(i)
proc readMessages(server: AsyncSocket) {.async.} =
let maxResponses = (swarmSize * messagesToSend)
var
buffer = newString(16384)
i = 0
while i < maxResponses:
zeroMem(addr(buffer[0]), 16384)
let (size, fromIp, fromPort) = await recvFrom(server, addr buffer[0], 16384)
if buffer.startswith("Message ") and fromIp == "127.0.0.1":
await sendTo(server, buffer[0 .. (size - 1)], fromIp, fromPort)
inc(msgCount)
saveReceivedPort(fromPort)
inc(i)
proc createServer() {.async.} =
var server = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM, Protocol.IPPROTO_UDP, false)
bindAddr(server, Port(serverPort), "127.0.0.1")
asyncCheck readMessages(server)
asyncCheck createServer()
asyncCheck launchSwarm("127.0.0.1", Port(serverPort))
while true:
poll()
if recvCount == swarmSize * messagesToSend:
break
assert msgCount == swarmSize * messagesToSend
assert sendports == recvports
echo msgCount