Files
Nim/lib/pure/asyncnet.nim
Jaremy Creechley 141b76e365 Add Zephyr Support (#19003)
* Porting Nim to run on Zephyr.

Includes changes to `std/net`.

Squashed commit of the following:
    tweaking more memory / malloc things
    revert back bitmasks
    tweaking nim to use kernel heap as C malloc doesn't work
    fixing socket polling on zephyr
    cleanup getting maximum sockets for process or for rtos'es
    reorganizing and fixing net for async / system
    merge netlite changes back into nativesockets
    merge netlite changes back into nativesockets
    reverting native sockets back
    tweaking nim / zephyr network
    adding option to run 'net-lite' from linux
    bridging zephyr's max connections
    fixing net errors
    fixing compilation with getAddrString
    fixing compilation with getAddrString
    experimenting with a nativesockets_lite ... getAddrString
    experimenting with a nativesockets_lite ... getAddrString
    experimenting with a nativesockets_lite ... getLocalAddr
    experimenting with a nativesockets_lite ... getLocalAddr
    experimenting with a nativesockets_lite ...
    add note regarding incorrect FreeRTOS Sockadd_in fields
    changing to NIM_STATIC_ASSERT
    cleaning up the static_assert error messages
    cleaning up the static_assert error messages
    setting up static assert ftw!
    testing compile time asserts
    reworking Sockaddr objects to more closely match various platforms
    reworking Sockaddr objects to more closely match various platforms
    reworking Sockaddr objects to more closely match various platforms
    finding missing items (issue  #18684)
    fixup posix constants (issue  #18684)
    adding plumbing for zephyr os (issue  #18684)
    adding plumbing for zephyr os (issue  #18684)

* fixing constant capitalizations

* remove extra debug prints and fix TSa_Family/cint issue

* remove extra debug prints and fix TSa_Family/cint issue

* Porting Nim to run on Zephyr.

Includes changes to `std/net`.

Squashed commit of the following:
    tweaking more memory / malloc things
    revert back bitmasks
    tweaking nim to use kernel heap as C malloc doesn't work
    fixing socket polling on zephyr
    cleanup getting maximum sockets for process or for rtos'es
    reorganizing and fixing net for async / system
    merge netlite changes back into nativesockets
    merge netlite changes back into nativesockets
    reverting native sockets back
    tweaking nim / zephyr network
    adding option to run 'net-lite' from linux
    bridging zephyr's max connections
    fixing net errors
    fixing compilation with getAddrString
    fixing compilation with getAddrString
    experimenting with a nativesockets_lite ... getAddrString
    experimenting with a nativesockets_lite ... getAddrString
    experimenting with a nativesockets_lite ... getLocalAddr
    experimenting with a nativesockets_lite ... getLocalAddr
    experimenting with a nativesockets_lite ...
    add note regarding incorrect FreeRTOS Sockadd_in fields
    changing to NIM_STATIC_ASSERT
    cleaning up the static_assert error messages
    cleaning up the static_assert error messages
    setting up static assert ftw!
    testing compile time asserts
    reworking Sockaddr objects to more closely match various platforms
    reworking Sockaddr objects to more closely match various platforms
    reworking Sockaddr objects to more closely match various platforms
    finding missing items (issue  #18684)
    fixup posix constants (issue  #18684)
    adding plumbing for zephyr os (issue  #18684)
    adding plumbing for zephyr os (issue  #18684)

* fixing constant capitalizations

* remove extra debug prints and fix TSa_Family/cint issue

* remove extra debug prints and fix TSa_Family/cint issue

* fixing PR issues

* Porting Nim to run on Zephyr.

Includes changes to `std/net`.

Squashed commit of the following:
    tweaking more memory / malloc things
    revert back bitmasks
    tweaking nim to use kernel heap as C malloc doesn't work
    fixing socket polling on zephyr
    cleanup getting maximum sockets for process or for rtos'es
    reorganizing and fixing net for async / system
    merge netlite changes back into nativesockets
    merge netlite changes back into nativesockets
    reverting native sockets back
    tweaking nim / zephyr network
    adding option to run 'net-lite' from linux
    bridging zephyr's max connections
    fixing net errors
    fixing compilation with getAddrString
    fixing compilation with getAddrString
    experimenting with a nativesockets_lite ... getAddrString
    experimenting with a nativesockets_lite ... getAddrString
    experimenting with a nativesockets_lite ... getLocalAddr
    experimenting with a nativesockets_lite ... getLocalAddr
    experimenting with a nativesockets_lite ...
    add note regarding incorrect FreeRTOS Sockadd_in fields
    changing to NIM_STATIC_ASSERT
    cleaning up the static_assert error messages
    cleaning up the static_assert error messages
    setting up static assert ftw!
    testing compile time asserts
    reworking Sockaddr objects to more closely match various platforms
    reworking Sockaddr objects to more closely match various platforms
    reworking Sockaddr objects to more closely match various platforms
    finding missing items (issue  #18684)
    fixup posix constants (issue  #18684)
    adding plumbing for zephyr os (issue  #18684)
    adding plumbing for zephyr os (issue  #18684)

* fixing constant capitalizations

* remove extra debug prints and fix TSa_Family/cint issue

* remove extra debug prints and fix TSa_Family/cint issue

* Remerge

* fixing constant capitalizations

* remove extra debug prints and fix TSa_Family/cint issue

* remove extra debug prints and fix TSa_Family/cint issue

* fixing PR issues

* fix maxDescriptors on zephyr/freertos

* move maxDescriptors to selector.nim -- fixes compile issue

* change realloc impl on zephyr to match ansi c behavior

* change realloc impl on zephyr to match ansi c behavior

* force compileOnly mode for tlwip

Co-authored-by: Jaremy J. Creechley <jaremy.creechley@wavebaselabs.com>
Co-authored-by: Jaremy Creechley <jaremy.creechley@panthalassa.com>
2021-10-24 11:51:57 +02:00

997 lines
35 KiB
Nim

#
#
# Nim's Runtime Library
# (c) Copyright 2017 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements a high-level asynchronous sockets API based on the
## asynchronous dispatcher defined in the `asyncdispatch` module.
##
## Asynchronous IO in Nim
## ======================
##
## Async IO in Nim consists of multiple layers (from highest to lowest):
##
## * `asyncnet` module
##
## * Async await
##
## * `asyncdispatch` module (event loop)
##
## * `selectors` module
##
## Each builds on top of the layers below it. The selectors module is an
## abstraction for the various system `select()` mechanisms such as epoll or
## kqueue. If you wish you can use it directly, and some people have done so
## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
## But you must be aware that on Windows it only supports
## `select()`.
##
## The async dispatcher implements the proactor pattern and also has an
## implementation of IOCP. It implements the proactor pattern for other
## OS' via the selectors module. Futures are also implemented here, and
## indeed all the procedures return a future.
##
## The final layer is the async await transformation. This allows you to
## write asynchronous code in a synchronous style and works similar to
## C#'s await. The transformation works by converting any async procedures
## into an iterator.
##
## This is all single threaded, fully non-blocking and does give you a
## lot of control. In theory you should be able to work with any of these
## layers interchangeably (as long as you only care about non-Windows
## platforms).
##
## For most applications using `asyncnet` is the way to go as it builds
## over all the layers, providing some extra features such as buffering.
##
## SSL
## ===
##
## SSL can be enabled by compiling with the `-d:ssl` flag.
##
## You must create a new SSL context with the `newContext` function defined
## in the `net` module. You may then call `wrapSocket` on your socket using
## the newly created SSL context to get an SSL socket.
##
## Examples
## ========
##
## Chat server
## -----------
##
## The following example demonstrates a simple chat server.
##
## .. code-block:: Nim
##
## import std/[asyncnet, asyncdispatch]
##
## var clients {.threadvar.}: seq[AsyncSocket]
##
## proc processClient(client: AsyncSocket) {.async.} =
## while true:
## let line = await client.recvLine()
## if line.len == 0: break
## for c in clients:
## await c.send(line & "\c\L")
##
## proc serve() {.async.} =
## clients = @[]
## var server = newAsyncSocket()
## server.setSockOpt(OptReuseAddr, true)
## server.bindAddr(Port(12345))
## server.listen()
##
## while true:
## let client = await server.accept()
## clients.add client
##
## asyncCheck processClient(client)
##
## asyncCheck serve()
## runForever()
##
import std/private/since
import asyncdispatch, nativesockets, net, os
export SOBool
# TODO: Remove duplication introduced by PR #4683.
const defineSsl = defined(ssl) or defined(nimdoc)
const useNimNetLite = defined(nimNetLite) or defined(freertos) or defined(zephyr)
when defineSsl:
import openssl
type
# TODO: I would prefer to just do:
# AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
AsyncSocketDesc = object
fd: SocketHandle
closed: bool ## determines whether this socket has been closed
isBuffered: bool ## determines whether this socket is buffered.
buffer: array[0..BufferSize, char]
currPos: int # current index in buffer
bufLen: int # current length of buffer
isSsl: bool
when defineSsl:
sslHandle: SslPtr
sslContext: SslContext
bioIn: BIO
bioOut: BIO
sslNoShutdown: bool
domain: Domain
sockType: SockType
protocol: Protocol
AsyncSocket* = ref AsyncSocketDesc
proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP,
buffered = true,
inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
## Creates a new `AsyncSocket` based on the supplied params.
##
## The supplied `fd`'s non-blocking state will be enabled implicitly.
##
## If `inheritable` is false (the default), the supplied `fd` will not
## be inheritable by child processes.
##
## **Note**: This procedure will **NOT** register `fd` with the global
## async dispatcher. You need to do this manually. If you have used
## `newAsyncNativeSocket` to create `fd` then it's already registered.
assert fd != osInvalidSocket.AsyncFD
new(result)
result.fd = fd.SocketHandle
fd.SocketHandle.setBlocking(false)
if not fd.SocketHandle.setInheritable(inheritable):
raiseOSError(osLastError())
result.isBuffered = buffered
result.domain = domain
result.sockType = sockType
result.protocol = protocol
if buffered:
result.currPos = 0
proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
protocol: Protocol = IPPROTO_TCP, buffered = true,
inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
## Creates a new asynchronous socket.
##
## This procedure will also create a brand new file descriptor for
## this socket.
##
## If `inheritable` is false (the default), the new file descriptor will not
## be inheritable by child processes.
let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
if fd.SocketHandle == osInvalidSocket:
raiseOSError(osLastError())
result = newAsyncSocket(fd, domain, sockType, protocol, buffered, inheritable)
proc getLocalAddr*(socket: AsyncSocket): (string, Port) =
## Get the socket's local address and port number.
##
## This is high-level interface for `getsockname`:idx:.
getLocalAddr(socket.fd, socket.domain)
when not useNimNetLite:
proc getPeerAddr*(socket: AsyncSocket): (string, Port) =
## Get the socket's peer address and port number.
##
## This is high-level interface for `getpeername`:idx:.
getPeerAddr(socket.fd, socket.domain)
proc newAsyncSocket*(domain, sockType, protocol: cint,
buffered = true,
inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
## Creates a new asynchronous socket.
##
## This procedure will also create a brand new file descriptor for
## this socket.
##
## If `inheritable` is false (the default), the new file descriptor will not
## be inheritable by child processes.
let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
if fd.SocketHandle == osInvalidSocket:
raiseOSError(osLastError())
result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
Protocol(protocol), buffered, inheritable)
when defineSsl:
proc getSslError(socket: AsyncSocket, err: cint): cint =
assert socket.isSsl
assert err < 0
var ret = SSL_get_error(socket.sslHandle, err.cint)
case ret
of SSL_ERROR_ZERO_RETURN:
raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
return ret
of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
return ret
of SSL_ERROR_WANT_X509_LOOKUP:
raiseSSLError("Function for x509 lookup has been called.")
of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
socket.sslNoShutdown = true
raiseSSLError()
else: raiseSSLError("Unknown Error")
proc sendPendingSslData(socket: AsyncSocket,
flags: set[SocketFlag]) {.async.} =
let len = bioCtrlPending(socket.bioOut)
if len > 0:
var data = newString(len)
let read = bioRead(socket.bioOut, addr data[0], len)
assert read != 0
if read < 0:
raiseSSLError()
data.setLen(read)
await socket.fd.AsyncFD.send(data, flags)
proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
sslError: cint): owned(Future[bool]) {.async.} =
## Returns `true` if `socket` is still connected, otherwise `false`.
result = true
case sslError
of SSL_ERROR_WANT_WRITE:
await sendPendingSslData(socket, flags)
of SSL_ERROR_WANT_READ:
var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
let length = len(data)
if length > 0:
let ret = bioWrite(socket.bioIn, addr data[0], length.cint)
if ret < 0:
raiseSSLError()
elif length == 0:
# connection not properly closed by remote side or connection dropped
SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
result = false
else:
raiseSSLError("Cannot appease SSL.")
template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
op: untyped) =
var opResult {.inject.} = -1.cint
while opResult < 0:
ErrClearError()
# Call the desired operation.
opResult = op
# Send any remaining pending SSL data.
await sendPendingSslData(socket, flags)
# If the operation failed, try to see if SSL has some data to read
# or write.
if opResult < 0:
let err = getSslError(socket, opResult.cint)
let fut = appeaseSsl(socket, flags, err.cint)
yield fut
if not fut.read():
# Socket disconnected.
if SocketFlag.SafeDisconn in flags:
opResult = 0.cint
break
else:
raiseSSLError("Socket has been disconnected")
proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
buffered = true): owned(Future[AsyncSocket]) {.async.} =
## Establishes connection to the specified `address`:`port` pair via the
## specified protocol. The procedure iterates through possible
## resolutions of the `address` until it succeeds, meaning that it
## seamlessly works with both IPv4 and IPv6.
## Returns AsyncSocket ready to send or receive data.
let asyncFd = await asyncdispatch.dial(address, port, protocol)
let sockType = protocol.toSockType()
let domain = getSockDomain(asyncFd.SocketHandle)
result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
## Connects `socket` to server at `address:port`.
##
## Returns a `Future` which will complete when the connection succeeds
## or an error occurs.
await connect(socket.fd.AsyncFD, address, port, socket.domain)
if socket.isSsl:
when defineSsl:
if not isIpAddress(address):
# Set the SNI address for this connection. This call can fail if
# we're not using TLSv1+.
discard SSL_set_tlsext_host_name(socket.sslHandle, address)
let flags = {SocketFlag.SafeDisconn}
sslSetConnectState(socket.sslHandle)
sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
template readInto(buf: pointer, size: int, socket: AsyncSocket,
flags: set[SocketFlag]): int =
## Reads **up to** `size` bytes from `socket` into `buf`. Note that
## this is a template and not a proc.
assert(not socket.closed, "Cannot `recv` on a closed socket")
var res = 0
if socket.isSsl:
when defineSsl:
# SSL mode.
sslLoop(socket, flags,
sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
res = opResult
else:
# Not in SSL mode.
res = await asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
res
template readIntoBuf(socket: AsyncSocket,
flags: set[SocketFlag]): int =
var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
socket.currPos = 0
socket.bufLen = size
size
proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[int]) {.async.} =
## Reads **up to** `size` bytes from `socket` into `buf`.
##
## For buffered sockets this function will attempt to read all the requested
## data. It will read this data in `BufferSize` chunks.
##
## For unbuffered sockets this function makes no effort to read
## all the data requested. It will return as much data as the operating system
## gives it.
##
## If socket is disconnected during the
## recv operation then the future may complete with only a part of the
## requested data.
##
## If socket is disconnected and no data is available
## to be read then the future will complete with a value of `0`.
if socket.isBuffered:
let originalBufPos = socket.currPos
if socket.bufLen == 0:
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
if res == 0:
return 0
var read = 0
var cbuf = cast[cstring](buf)
while read < size:
if socket.currPos >= socket.bufLen:
if SocketFlag.Peek in flags:
# We don't want to get another buffer if we're peeking.
break
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
if res == 0:
break
let chunk = min(socket.bufLen-socket.currPos, size-read)
copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
read.inc(chunk)
socket.currPos.inc(chunk)
if SocketFlag.Peek in flags:
# Restore old buffer cursor position.
socket.currPos = originalBufPos
result = read
else:
result = readInto(buf, size, socket, flags)
proc recv*(socket: AsyncSocket, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) {.async.} =
## Reads **up to** `size` bytes from `socket`.
##
## For buffered sockets this function will attempt to read all the requested
## data. It will read this data in `BufferSize` chunks.
##
## For unbuffered sockets this function makes no effort to read
## all the data requested. It will return as much data as the operating system
## gives it.
##
## If socket is disconnected during the
## recv operation then the future may complete with only a part of the
## requested data.
##
## If socket is disconnected and no data is available
## to be read then the future will complete with a value of `""`.
if socket.isBuffered:
result = newString(size)
shallow(result)
let originalBufPos = socket.currPos
if socket.bufLen == 0:
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
if res == 0:
result.setLen(0)
return
var read = 0
while read < size:
if socket.currPos >= socket.bufLen:
if SocketFlag.Peek in flags:
# We don't want to get another buffer if we're peeking.
break
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
if res == 0:
break
let chunk = min(socket.bufLen-socket.currPos, size-read)
copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
read.inc(chunk)
socket.currPos.inc(chunk)
if SocketFlag.Peek in flags:
# Restore old buffer cursor position.
socket.currPos = originalBufPos
result.setLen(read)
else:
result = newString(size)
let read = readInto(addr result[0], size, socket, flags)
result.setLen(read)
proc send*(socket: AsyncSocket, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}) {.async.} =
## Sends `size` bytes from `buf` to `socket`. The returned future will complete once all
## data has been sent.
assert socket != nil
assert(not socket.closed, "Cannot `send` on a closed socket")
if socket.isSsl:
when defineSsl:
sslLoop(socket, flags,
sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
await sendPendingSslData(socket, flags)
else:
await send(socket.fd.AsyncFD, buf, size, flags)
proc send*(socket: AsyncSocket, data: string,
flags = {SocketFlag.SafeDisconn}) {.async.} =
## Sends `data` to `socket`. The returned future will complete once all
## data has been sent.
assert socket != nil
if socket.isSsl:
when defineSsl:
var copy = data
sslLoop(socket, flags,
sslWrite(socket.sslHandle, addr copy[0], copy.len.cint))
await sendPendingSslData(socket, flags)
else:
await send(socket.fd.AsyncFD, data, flags)
proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn},
inheritable = defined(nimInheritHandles)):
owned(Future[tuple[address: string, client: AsyncSocket]]) =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
##
## If `inheritable` is false (the default), the resulting client socket will
## not be inheritable by child processes.
##
## The future will complete when the connection is successfully accepted.
var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
var fut = acceptAddr(socket.fd.AsyncFD, flags, inheritable)
fut.callback =
proc (future: Future[tuple[address: string, client: AsyncFD]]) =
assert future.finished
if future.failed:
retFuture.fail(future.readError)
else:
let resultTup = (future.read.address,
newAsyncSocket(future.read.client, socket.domain,
socket.sockType, socket.protocol, socket.isBuffered, inheritable))
retFuture.complete(resultTup)
return retFuture
proc accept*(socket: AsyncSocket,
flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncSocket]) =
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection.
## If `inheritable` is false (the default), the resulting client socket will
## not be inheritable by child processes.
## The future will complete when the connection is successfully accepted.
var retFut = newFuture[AsyncSocket]("asyncnet.accept")
var fut = acceptAddr(socket, flags)
fut.callback =
proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
assert future.finished
if future.failed:
retFut.fail(future.readError)
else:
retFut.complete(future.read.client)
return retFut
proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
## Reads a line of data from `socket` into `resString`.
##
## If a full line is read `\r\L` is not
## added to `line`, however if solely `\r\L` is read then `line`
## will be set to it.
##
## If the socket is disconnected, `line` will be set to `""`.
##
## If the socket is disconnected in the middle of a line (before `\r\L`
## is read) then line will be set to `""`.
## The partial line **will be lost**.
##
## The `maxLength` parameter determines the maximum amount of characters
## that can be read. `resString` will be truncated after that.
##
## .. warning:: The `Peek` flag is not yet implemented.
##
## .. warning:: `recvLineInto` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
assert SocketFlag.Peek notin flags ## TODO:
result = newFuture[void]("asyncnet.recvLineInto")
# TODO: Make the async transformation check for FutureVar params and complete
# them when the result future is completed.
# Can we replace the result future with the FutureVar?
template addNLIfEmpty(): untyped =
if resString.mget.len == 0:
resString.mget.add("\c\L")
if socket.isBuffered:
if socket.bufLen == 0:
let res = socket.readIntoBuf(flags)
if res == 0:
resString.complete()
return
var lastR = false
while true:
if socket.currPos >= socket.bufLen:
let res = socket.readIntoBuf(flags)
if res == 0:
resString.mget.setLen(0)
resString.complete()
return
case socket.buffer[socket.currPos]
of '\r':
lastR = true
addNLIfEmpty()
of '\L':
addNLIfEmpty()
socket.currPos.inc()
resString.complete()
return
else:
if lastR:
socket.currPos.inc()
resString.complete()
return
else:
resString.mget.add socket.buffer[socket.currPos]
socket.currPos.inc()
# Verify that this isn't a DOS attack: #3847.
if resString.mget.len > maxLength: break
else:
var c = ""
while true:
c = await recv(socket, 1, flags)
if c.len == 0:
resString.mget.setLen(0)
resString.complete()
return
if c == "\r":
c = await recv(socket, 1, flags) # Skip \L
assert c == "\L"
addNLIfEmpty()
resString.complete()
return
elif c == "\L":
addNLIfEmpty()
resString.complete()
return
resString.mget.add c
# Verify that this isn't a DOS attack: #3847.
if resString.mget.len > maxLength: break
resString.complete()
proc recvLine*(socket: AsyncSocket,
flags = {SocketFlag.SafeDisconn},
maxLength = MaxLineLength): owned(Future[string]) {.async.} =
## Reads a line of data from `socket`. Returned future will complete once
## a full line is read or an error occurs.
##
## If a full line is read `\r\L` is not
## added to `line`, however if solely `\r\L` is read then `line`
## will be set to it.
##
## If the socket is disconnected, `line` will be set to `""`.
##
## If the socket is disconnected in the middle of a line (before `\r\L`
## is read) then line will be set to `""`.
## The partial line **will be lost**.
##
## The `maxLength` parameter determines the maximum amount of characters
## that can be read. The result is truncated after that.
##
## .. warning:: The `Peek` flag is not yet implemented.
##
## .. warning:: `recvLine` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
assert SocketFlag.Peek notin flags ## TODO:
# TODO: Optimise this
var resString = newFutureVar[string]("asyncnet.recvLine")
resString.mget() = ""
await socket.recvLineInto(resString, flags, maxLength)
result = resString.mget()
proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [
ReadIOEffect].} =
## Marks `socket` as accepting connections.
## `Backlog` specifies the maximum length of the
## queue of pending connections.
##
## Raises an OSError error upon failure.
if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
tags: [ReadIOEffect].} =
## Binds `address`:`port` to the socket.
##
## If `address` is "" then ADDR_ANY will be bound.
var realaddr = address
if realaddr == "":
case socket.domain
of AF_INET6: realaddr = "::"
of AF_INET: realaddr = "0.0.0.0"
else:
raise newException(ValueError,
"Unknown socket address family and no address specified to bindAddr")
var aiList = getAddrInfo(realaddr, port, socket.domain)
if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.SockLen) < 0'i32:
freeaddrinfo(aiList)
raiseOSError(osLastError())
freeaddrinfo(aiList)
proc hasDataBuffered*(s: AsyncSocket): bool {.since: (1, 5).} =
## Determines whether an AsyncSocket has data buffered.
# xxx dedup with std/net
s.isBuffered and s.bufLen > 0 and s.currPos != s.bufLen
when defined(posix) and not useNimNetLite:
proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
## Binds Unix socket to `path`.
## This only works on Unix-style systems: Mac OS X, BSD and Linux
when not defined(nimdoc):
let retFuture = newFuture[void]("connectUnix")
result = retFuture
proc cb(fd: AsyncFD): bool =
let ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
if ret == 0:
retFuture.complete()
return true
elif ret == EINTR:
return false
else:
retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
return true
var socketAddr = makeUnixAddr(path)
let ret = socket.fd.connect(cast[ptr SockAddr](addr socketAddr),
(sizeof(socketAddr.sun_family) + path.len).SockLen)
if ret == 0:
# Request to connect completed immediately.
retFuture.complete()
else:
let lastError = osLastError()
if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
addWrite(AsyncFD(socket.fd), cb)
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
proc bindUnix*(socket: AsyncSocket, path: string) {.
tags: [ReadIOEffect].} =
## Binds Unix socket to `path`.
## This only works on Unix-style systems: Mac OS X, BSD and Linux
when not defined(nimdoc):
var socketAddr = makeUnixAddr(path)
if socket.fd.bindAddr(cast[ptr SockAddr](addr socketAddr),
(sizeof(socketAddr.sun_family) + path.len).SockLen) != 0'i32:
raiseOSError(osLastError())
elif defined(nimdoc):
proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
## Binds Unix socket to `path`.
## This only works on Unix-style systems: Mac OS X, BSD and Linux
discard
proc bindUnix*(socket: AsyncSocket, path: string) =
## Binds Unix socket to `path`.
## This only works on Unix-style systems: Mac OS X, BSD and Linux
discard
proc close*(socket: AsyncSocket) =
## Closes the socket.
if socket.closed: return
defer:
socket.fd.AsyncFD.closeSocket()
socket.closed = true # TODO: Add extra debugging checks for this.
when defineSsl:
if socket.isSsl:
let res =
# Don't call SSL_shutdown if the connection has not been fully
# established, see:
# https://github.com/openssl/openssl/issues/710#issuecomment-253897666
if not socket.sslNoShutdown and SSL_in_init(socket.sslHandle) == 0:
ErrClearError()
SSL_shutdown(socket.sslHandle)
else:
0
SSL_free(socket.sslHandle)
if res == 0:
discard
elif res != 1:
raiseSSLError()
when defineSsl:
proc sslHandle*(self: AsyncSocket): SslPtr =
## Retrieve the ssl pointer of `socket`.
## Useful for interfacing with `openssl`.
self.sslHandle
proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
## Wraps a socket in an SSL context. This function effectively turns
## `socket` into an SSL socket.
##
## **Disclaimer**: This code is not well tested, may be very unsafe and
## prone to security vulnerabilities.
socket.isSsl = true
socket.sslContext = ctx
socket.sslHandle = SSL_new(socket.sslContext.context)
if socket.sslHandle == nil:
raiseSSLError()
socket.bioIn = bioNew(bioSMem())
socket.bioOut = bioNew(bioSMem())
sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
socket.sslNoShutdown = true
proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
handshake: SslHandshakeType,
hostname: string = "") =
## Wraps a connected socket in an SSL context. This function effectively
## turns `socket` into an SSL socket.
## `hostname` should be specified so that the client knows which hostname
## the server certificate should be validated against.
##
## This should be called on a connected socket, and will perform
## an SSL handshake immediately.
##
## **Disclaimer**: This code is not well tested, may be very unsafe and
## prone to security vulnerabilities.
wrapSocket(ctx, socket)
case handshake
of handshakeAsClient:
if hostname.len > 0 and not isIpAddress(hostname):
# Set the SNI address for this connection. This call can fail if
# we're not using TLSv1+.
discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
sslSetConnectState(socket.sslHandle)
of handshakeAsServer:
sslSetAcceptState(socket.sslHandle)
proc getPeerCertificates*(socket: AsyncSocket): seq[Certificate] {.since: (1, 1).} =
## Returns the certificate chain received by the peer we are connected to
## through the given socket.
## The handshake must have been completed and the certificate chain must
## have been verified successfully or else an empty sequence is returned.
## The chain is ordered from leaf certificate to root certificate.
if not socket.isSsl:
result = newSeq[Certificate]()
else:
result = getPeerCertificates(socket.sslHandle)
proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
tags: [ReadIOEffect].} =
## Retrieves option `opt` as a boolean value.
var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
result = res != 0
proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
## Sets option `opt` to a boolean value specified by `value`.
var valuei = cint(if value: 1 else: 0)
setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
proc isSsl*(socket: AsyncSocket): bool =
## Determines whether `socket` is a SSL socket.
socket.isSsl
proc getFd*(socket: AsyncSocket): SocketHandle =
## Returns the socket's file descriptor.
return socket.fd
proc isClosed*(socket: AsyncSocket): bool =
## Determines whether the socket has been closed.
return socket.closed
proc sendTo*(socket: AsyncSocket, address: string, port: Port, data: string,
flags = {SocketFlag.SafeDisconn}): owned(Future[void])
{.async, since: (1, 3).} =
## This proc sends `data` to the specified `address`, which may be an IP
## address or a hostname. If a hostname is specified this function will try
## each IP of that hostname. The returned future will complete once all data
## has been sent.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
assert(socket.protocol != IPPROTO_TCP,
"Cannot `sendTo` on a TCP socket. Use `send` instead")
assert(not socket.closed, "Cannot `sendTo` on a closed socket")
let aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
socket.protocol)
var
it = aiList
success = false
lastException: ref Exception
while it != nil:
let fut = sendTo(socket.fd.AsyncFD, cstring(data), len(data), it.ai_addr,
it.ai_addrlen.SockLen, flags)
yield fut
if not fut.failed:
success = true
break
lastException = fut.readError()
it = it.ai_next
freeaddrinfo(aiList)
if not success:
if lastException != nil:
raise lastException
else:
raise newException(IOError, "Couldn't resolve address: " & address)
proc recvFrom*(socket: AsyncSocket, data: FutureVar[string], size: int,
address: FutureVar[string], port: FutureVar[Port],
flags = {SocketFlag.SafeDisconn}): owned(Future[int])
{.async, since: (1, 3).} =
## Receives a datagram data from `socket` into `data`, which must be at
## least of size `size`. The address and port of datagram's sender will be
## stored into `address` and `port`, respectively. Returned future will
## complete once one datagram has been received, and will return size of
## packet received.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
##
## **Notes**
## * `data` must be initialized to the length of `size`.
## * `address` must be initialized to 46 in length.
template adaptRecvFromToDomain(domain: Domain) =
var lAddr = sizeof(sAddr).SockLen
result = await recvFromInto(AsyncFD(getFd(socket)), cstring(data.mget()), size,
cast[ptr SockAddr](addr sAddr), addr lAddr,
flags)
data.mget().setLen(result)
data.complete()
getAddrString(cast[ptr SockAddr](addr sAddr), address.mget())
address.complete()
when domain == AF_INET6:
port.complete(ntohs(sAddr.sin6_port).Port)
else:
port.complete(ntohs(sAddr.sin_port).Port)
assert(socket.protocol != IPPROTO_TCP,
"Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead")
assert(not socket.closed, "Cannot `recvFrom` on a closed socket")
assert(size == len(data.mget()),
"`date` was not initialized correctly. `size` != `len(data.mget())`")
assert(46 == len(address.mget()),
"`address` was not initialized correctly. 46 != `len(address.mget())`")
case socket.domain
of AF_INET6:
var sAddr: Sockaddr_in6
adaptRecvFromToDomain(AF_INET6)
of AF_INET:
var sAddr: Sockaddr_in
adaptRecvFromToDomain(AF_INET)
else:
raise newException(ValueError, "Unknown socket address family")
proc recvFrom*(socket: AsyncSocket, size: int,
flags = {SocketFlag.SafeDisconn}):
owned(Future[tuple[data: string, address: string, port: Port]])
{.async, since: (1, 3).} =
## Receives a datagram data from `socket`, which must be at least of size
## `size`. Returned future will complete once one datagram has been received
## and will return tuple with: data of packet received; and address and port
## of datagram's sender.
##
## If an error occurs an OSError exception will be raised.
##
## This proc is normally used with connectionless sockets (UDP sockets).
var
data = newFutureVar[string]()
address = newFutureVar[string]()
port = newFutureVar[Port]()
data.mget().setLen(size)
address.mget().setLen(46)
let read = await recvFrom(socket, data, size, address, port, flags)
result = (data.mget(), address.mget(), port.mget())
when not defined(testing) and isMainModule:
type
TestCases = enum
HighClient, LowClient, LowServer
const test = HighClient
when test == HighClient:
proc main() {.async.} =
var sock = newAsyncSocket()
await sock.connect("irc.freenode.net", Port(6667))
while true:
let line = await sock.recvLine()
if line == "":
echo("Disconnected")
break
else:
echo("Got line: ", line)
asyncCheck main()
elif test == LowClient:
var sock = newAsyncSocket()
var f = connect(sock, "irc.freenode.net", Port(6667))
f.callback =
proc (future: Future[void]) =
echo("Connected in future!")
for i in 0 .. 50:
var recvF = recv(sock, 10)
recvF.callback =
proc (future: Future[string]) =
echo("Read ", future.read.len, ": ", future.read.repr)
elif test == LowServer:
var sock = newAsyncSocket()
sock.bindAddr(Port(6667))
sock.listen()
proc onAccept(future: Future[AsyncSocket]) =
let client = future.read
echo "Accepted ", client.fd.cint
var t = send(client, "test\c\L")
t.callback =
proc (future: Future[void]) =
echo("Send")
client.close()
var f = accept(sock)
f.callback = onAccept
var f = accept(sock)
f.callback = onAccept
runForever()