mirror of
https://github.com/nim-lang/Nim.git
synced 2026-06-05 03:14:08 +00:00
UDP improvements for the sockets module. Renamed ftpclient.fileExists to
existsFile for consistency. Added tasyncudp test.
This commit is contained in:
@@ -30,6 +30,11 @@ import sockets, os
|
||||
## on with the events. The type that you set userArg to must be inheriting from
|
||||
## TObject!
|
||||
##
|
||||
## **Note:** If you want to provide async ability to your module please do not
|
||||
## use the ``TDelegate`` object, instead use ``PAsyncSocket``. It is possible
|
||||
## that in the future this type's fields will not be exported therefore breaking
|
||||
## your code.
|
||||
##
|
||||
## Asynchronous sockets
|
||||
## ====================
|
||||
##
|
||||
@@ -68,7 +73,8 @@ import sockets, os
|
||||
|
||||
|
||||
type
|
||||
TDelegate* = object
|
||||
|
||||
TDelegate = object
|
||||
deleVal*: PObject
|
||||
|
||||
handleRead*: proc (h: PObject) {.nimcall.}
|
||||
@@ -92,12 +98,10 @@ type
|
||||
socket: TSocket
|
||||
info: TInfo
|
||||
|
||||
userArg: PObject
|
||||
handleRead*: proc (s: PAsyncSocket) {.closure.}
|
||||
handleConnect*: proc (s: PAsyncSocket) {.closure.}
|
||||
|
||||
handleRead*: proc (s: PAsyncSocket, arg: PObject) {.nimcall.}
|
||||
handleConnect*: proc (s: PAsyncSocket, arg: PObject) {.nimcall.}
|
||||
|
||||
handleAccept*: proc (s: PAsyncSocket, arg: PObject) {.nimcall.}
|
||||
handleAccept*: proc (s: PAsyncSocket) {.closure.}
|
||||
|
||||
lineBuffer: TaintedString ## Temporary storage for ``recvLine``
|
||||
sslNeedAccept: bool
|
||||
@@ -121,21 +125,20 @@ proc newDelegate*(): PDelegate =
|
||||
result.task = (proc (h: PObject) = nil)
|
||||
result.mode = MReadable
|
||||
|
||||
proc newAsyncSocket(userArg: PObject = nil): PAsyncSocket =
|
||||
proc newAsyncSocket(): PAsyncSocket =
|
||||
new(result)
|
||||
result.info = SockIdle
|
||||
result.userArg = userArg
|
||||
|
||||
result.handleRead = (proc (s: PAsyncSocket, arg: PObject) = nil)
|
||||
result.handleConnect = (proc (s: PAsyncSocket, arg: PObject) = nil)
|
||||
result.handleAccept = (proc (s: PAsyncSocket, arg: PObject) = nil)
|
||||
result.handleRead = (proc (s: PAsyncSocket) = nil)
|
||||
result.handleConnect = (proc (s: PAsyncSocket) = nil)
|
||||
result.handleAccept = (proc (s: PAsyncSocket) = nil)
|
||||
|
||||
result.lineBuffer = "".TaintedString
|
||||
|
||||
proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
|
||||
protocol: TProtocol = IPPROTO_TCP,
|
||||
userArg: PObject = nil, buffered = true): PAsyncSocket =
|
||||
result = newAsyncSocket(userArg)
|
||||
buffered = true): PAsyncSocket =
|
||||
result = newAsyncSocket()
|
||||
result.socket = socket(domain, typ, protocol, buffered)
|
||||
result.proto = protocol
|
||||
if result.socket == InvalidSocket: OSError()
|
||||
@@ -148,15 +151,14 @@ proc asyncSockHandleConnect(h: PObject) =
|
||||
return
|
||||
|
||||
PAsyncSocket(h).info = SockConnected
|
||||
PAsyncSocket(h).handleConnect(PAsyncSocket(h),
|
||||
PAsyncSocket(h).userArg)
|
||||
PAsyncSocket(h).handleConnect(PAsyncSocket(h))
|
||||
|
||||
proc asyncSockHandleRead(h: PObject) =
|
||||
when defined(ssl):
|
||||
if PAsyncSocket(h).socket.isSSL and not
|
||||
PAsyncSocket(h).socket.gotHandshake:
|
||||
return
|
||||
PAsyncSocket(h).handleRead(PAsyncSocket(h), PAsyncSocket(h).userArg)
|
||||
PAsyncSocket(h).handleRead(PAsyncSocket(h))
|
||||
|
||||
when defined(ssl):
|
||||
proc asyncSockDoHandshake(h: PObject) =
|
||||
@@ -183,8 +185,7 @@ proc toDelegate(sock: PAsyncSocket): PDelegate =
|
||||
result.handleRead = asyncSockHandleRead
|
||||
|
||||
result.handleAccept = (proc (h: PObject) =
|
||||
PAsyncSocket(h).handleAccept(PAsyncSocket(h),
|
||||
PAsyncSocket(h).userArg))
|
||||
PAsyncSocket(h).handleAccept(PAsyncSocket(h)))
|
||||
|
||||
when defined(ssl):
|
||||
result.task = asyncSockDoHandshake
|
||||
@@ -337,7 +338,6 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool =
|
||||
of RecvFail:
|
||||
result = false
|
||||
|
||||
|
||||
proc poll*(d: PDispatcher, timeout: int = 500): bool =
|
||||
## This function checks for events on all the sockets in the `PDispatcher`.
|
||||
## It then proceeds to call the correct event handler.
|
||||
|
||||
@@ -227,7 +227,18 @@ proc listDirs*(ftp: var TFTPClient, dir: string = "",
|
||||
ftp.deleteJob()
|
||||
else: return @[]
|
||||
|
||||
proc fileExists*(ftp: var TFTPClient, file: string): bool =
|
||||
proc fileExists*(ftp: var TFTPClient, file: string): bool {.deprecated.} =
|
||||
## **Deprecated:** Please use ``existsFile``.
|
||||
##
|
||||
## Determines whether ``file`` exists.
|
||||
##
|
||||
## Warning: This function may block. Especially on directories with many
|
||||
## files, because a full list of file names must be retrieved.
|
||||
var files = ftp.listDirs()
|
||||
for f in items(files):
|
||||
if f.normalizePathSep == file.normalizePathSep: return true
|
||||
|
||||
proc existsFile*(ftp: var TFTPClient, file: string): bool =
|
||||
## Determines whether ``file`` exists.
|
||||
##
|
||||
## Warning: This function may block. Especially on directories with many
|
||||
|
||||
@@ -74,7 +74,7 @@ type
|
||||
SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets
|
||||
SOCK_DGRAM = 2, ## datagram service or Datagram Sockets
|
||||
SOCK_RAW = 3, ## raw protocols atop the network layer.
|
||||
SOCK_SEQPACKET = 5 ## reliable sequenced packet service, or
|
||||
SOCK_SEQPACKET = 5 ## reliable sequenced packet service
|
||||
|
||||
TProtocol* = enum ## third argument to `socket` proc
|
||||
IPPROTO_TCP = 6, ## Transmission control protocol.
|
||||
@@ -1203,7 +1203,45 @@ proc recvAsync*(socket: TSocket, s: var TaintedString): bool =
|
||||
setLen(s.string, s.string.len + bufSize)
|
||||
inc(pos, bytesRead)
|
||||
result = True
|
||||
|
||||
proc recvFrom*(socket: TSocket, data: var string, length: int,
|
||||
address: var string, flags = 0'i32): int =
|
||||
## Receives data from ``socket``. This function should normally be used with
|
||||
## connection-less sockets (UDP sockets).
|
||||
##
|
||||
## **Warning:** This function does not yet have a buffered implementation,
|
||||
## so when ``socket`` is buffered the non-buffered implementation will be
|
||||
## used. Therefore if ``socket`` contains something in its buffer this
|
||||
## function will make no effort to return it.
|
||||
|
||||
# TODO: Buffered sockets
|
||||
data = newString(length)
|
||||
var sockAddress: Tsockaddr_in
|
||||
var addrLen = sizeof(sockAddress).TSockLen
|
||||
result = recvFrom(socket.fd, cstring(data), length, flags,
|
||||
cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen))
|
||||
|
||||
if result != -1:
|
||||
address = $inet_ntoa(sockAddress.sin_addr)
|
||||
|
||||
proc recvFromAsync*(socket: TSocket, data: var String, length: int,
|
||||
address: var string, flags = 0'i32): bool =
|
||||
## Similar to ``recvFrom`` but raises an EOS error when an error occurs.
|
||||
## Returns False if no messages could be received from ``socket``.
|
||||
result = true
|
||||
var callRes = recvFrom(socket, data, length, address)
|
||||
if callRes < 0:
|
||||
when defined(windows):
|
||||
# TODO: Test on Windows
|
||||
var err = WSAGetLastError()
|
||||
if err == WSAEWOULDBLOCK:
|
||||
return False
|
||||
else: OSError()
|
||||
else:
|
||||
if errno == EAGAIN or errno == EWOULDBLOCK:
|
||||
return False
|
||||
else: OSError()
|
||||
|
||||
proc skip*(socket: TSocket) =
|
||||
## skips all the data that is pending for the socket
|
||||
const bufSize = 1000
|
||||
@@ -1270,6 +1308,37 @@ proc trySend*(socket: TSocket, data: string): bool =
|
||||
## and instead returns ``false`` on failure.
|
||||
result = send(socket, cstring(data), data.len) == data.len
|
||||
|
||||
proc sendTo*(socket: TSocket, address: string, port: TPort, data: pointer,
|
||||
size: int, af: TDomain = AF_INET, flags = 0'i32): int =
|
||||
## low-level sendTo proc. This proc sends ``data`` to the specified ``address``,
|
||||
## which may be an IP address or a hostname, if a hostname is specified
|
||||
## this function will try each IP of that hostname.
|
||||
##
|
||||
## **Note:** This proc is not available for SSL sockets.
|
||||
var hints: TAddrInfo
|
||||
var aiList: ptr TAddrInfo = nil
|
||||
hints.ai_family = toInt(af)
|
||||
hints.ai_socktype = toInt(SOCK_STREAM)
|
||||
hints.ai_protocol = toInt(IPPROTO_TCP)
|
||||
gaiNim(address, port, hints, aiList)
|
||||
|
||||
# try all possibilities:
|
||||
var success = false
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
result = sendTo(socket.fd, data, size.cint, flags, it.ai_addr,
|
||||
it.ai_addrlen.TSockLen)
|
||||
if result != -1'i32:
|
||||
success = true
|
||||
break
|
||||
it = it.ai_next
|
||||
|
||||
freeaddrinfo(aiList)
|
||||
|
||||
proc sendTo*(socket: TSocket, address: string, port: TPort, data: string): int =
|
||||
## Friendlier version of the low-level ``sendTo``.
|
||||
result = socket.sendTo(address, port, cstring(data), data.len)
|
||||
|
||||
when defined(Windows):
|
||||
const
|
||||
SOCKET_ERROR = -1
|
||||
@@ -1303,8 +1372,8 @@ proc connect*(socket: TSocket, timeout: int, name: string, port = TPort(0),
|
||||
## specifies the time in miliseconds of how long to wait for a connection
|
||||
## to be made.
|
||||
##
|
||||
## **Warning:** If ``socket`` is non-blocking and timeout is not ``-1`` then
|
||||
## this function may set blocking mode on ``socket`` to true.
|
||||
## **Warning:** If ``socket`` is non-blocking then
|
||||
## this function will set blocking mode on ``socket`` to true.
|
||||
socket.setBlocking(true)
|
||||
|
||||
socket.connectAsync(name, port, af)
|
||||
@@ -1313,6 +1382,7 @@ proc connect*(socket: TSocket, timeout: int, name: string, port = TPort(0),
|
||||
raise newException(ETimeout, "Call to connect() timed out.")
|
||||
|
||||
proc isSSL*(socket: TSocket): bool = return socket.isSSL
|
||||
## Determines whether ``socket`` is a SSL socket.
|
||||
|
||||
when defined(Windows):
|
||||
var wsa: TWSADATA
|
||||
|
||||
@@ -18,13 +18,13 @@ const
|
||||
swarmSize = 50
|
||||
messagesToSend = 100
|
||||
|
||||
proc swarmConnect(s: PAsyncSocket, arg: PObject) {.nimcall.} =
|
||||
proc swarmConnect(s: PAsyncSocket) =
|
||||
#echo("Connected")
|
||||
for i in 1..messagesToSend:
|
||||
s.send("Message " & $i & "\c\L")
|
||||
s.close()
|
||||
|
||||
proc serverRead(s: PAsyncSocket, arg: PObject) {.nimcall.} =
|
||||
proc serverRead(s: PAsyncSocket) =
|
||||
var line = ""
|
||||
assert s.recvLine(line)
|
||||
if line != "":
|
||||
@@ -36,7 +36,7 @@ proc serverRead(s: PAsyncSocket, arg: PObject) {.nimcall.} =
|
||||
else:
|
||||
s.close()
|
||||
|
||||
proc serverAccept(s: PAsyncSocket, arg: Pobject) {.nimcall.} =
|
||||
proc serverAccept(s: PAsyncSocket) =
|
||||
var client: PAsyncSocket
|
||||
new(client)
|
||||
s.accept(client)
|
||||
@@ -83,6 +83,8 @@ while true:
|
||||
break
|
||||
if not disp.poll(): break
|
||||
if disp.len == serverCount:
|
||||
# Only the servers are left in the dispatcher. All clients finished,
|
||||
# we need to therefore break.
|
||||
break
|
||||
|
||||
assert msgCount == (swarmSize * messagesToSend) * serverCount
|
||||
|
||||
77
tests/run/tasyncudp.nim
Normal file
77
tests/run/tasyncudp.nim
Normal file
@@ -0,0 +1,77 @@
|
||||
discard """
|
||||
file: "tasyncudp.nim"
|
||||
output: "2000"
|
||||
"""
|
||||
import asyncio, sockets, strutils, times
|
||||
|
||||
const
|
||||
swarmSize = 5
|
||||
messagesToSend = 200
|
||||
|
||||
var
|
||||
disp = newDispatcher()
|
||||
msgCount = 0
|
||||
currentClient = 0
|
||||
|
||||
proc serverRead(s: PAsyncSocket) =
|
||||
var data = ""
|
||||
var address = ""
|
||||
if s.recvFromAsync(data, 9, address):
|
||||
assert address == "127.0.0.1"
|
||||
msgCount.inc()
|
||||
|
||||
discard """
|
||||
|
||||
var line = ""
|
||||
assert s.recvLine(line)
|
||||
|
||||
if line == "":
|
||||
assert(false)
|
||||
else:
|
||||
if line.startsWith("Message "):
|
||||
msgCount.inc()
|
||||
else:
|
||||
assert(false)
|
||||
"""
|
||||
|
||||
proc swarmConnect(s: PAsyncSocket) =
|
||||
for i in 1..messagesToSend:
|
||||
s.send("Message\c\L")
|
||||
|
||||
proc createClient(disp: var PDispatcher, port: TPort,
|
||||
buffered = true) =
|
||||
currentClient.inc()
|
||||
var client = AsyncSocket(typ = SOCK_DGRAM, protocol = IPPROTO_UDP,
|
||||
buffered = buffered)
|
||||
client.handleConnect = swarmConnect
|
||||
disp.register(client)
|
||||
client.connect("localhost", port)
|
||||
|
||||
proc createServer(port: TPort, buffered = true) =
|
||||
var server = AsyncSocket(typ = SOCK_DGRAM, protocol = IPPROTO_UDP,
|
||||
buffered = buffered)
|
||||
server.handleRead = serverRead
|
||||
disp.register(server)
|
||||
server.bindAddr(port)
|
||||
|
||||
let serverCount = 2
|
||||
|
||||
createServer(TPort(10335), false)
|
||||
createServer(TPort(10336), true)
|
||||
var startTime = epochTime()
|
||||
while true:
|
||||
if epochTime() - startTime >= 300.0:
|
||||
break
|
||||
|
||||
if not disp.poll():
|
||||
break
|
||||
|
||||
if (msgCount div messagesToSend) * serverCount == currentClient:
|
||||
createClient(disp, TPort(10335), false)
|
||||
createClient(disp, TPort(10336), true)
|
||||
|
||||
if msgCount == messagesToSend * serverCount * swarmSize:
|
||||
break
|
||||
|
||||
assert msgCount == messagesToSend * serverCount * swarmSize
|
||||
echo(msgCount)
|
||||
Reference in New Issue
Block a user