mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-28 08:54:53 +00:00
Remove deprecated modules (asyncio, sockets, ftpclient) (#10401)
This commit is contained in:
@@ -1,711 +0,0 @@
|
||||
#
|
||||
#
|
||||
# Nim's Runtime Library
|
||||
# (c) Copyright 2012 Andreas Rumpf, Dominik Picheta
|
||||
# See the file "copying.txt", included in this
|
||||
# distribution, for details about the copyright.
|
||||
#
|
||||
|
||||
include "system/inclrtl"
|
||||
|
||||
import sockets, os
|
||||
|
||||
##
|
||||
## **Warning:** This module is deprecated since version 0.10.2.
|
||||
## Use the brand new `asyncdispatch <asyncdispatch.html>`_ module together
|
||||
## with the `asyncnet <asyncnet.html>`_ module.
|
||||
|
||||
## This module implements an asynchronous event loop together with asynchronous
|
||||
## sockets which use this event loop.
|
||||
## It is akin to Python's asyncore module. Many modules that use sockets
|
||||
## have an implementation for this module, those modules should all have a
|
||||
## ``register`` function which you should use to add the desired objects to a
|
||||
## dispatcher which you created so
|
||||
## that you can receive the events associated with that module's object.
|
||||
##
|
||||
## Once everything is registered in a dispatcher, you need to call the ``poll``
|
||||
## function in a while loop.
|
||||
##
|
||||
## **Note:** Most modules have tasks which need to be ran regularly, this is
|
||||
## why you should not call ``poll`` with a infinite timeout, or even a
|
||||
## very long one. In most cases the default timeout is fine.
|
||||
##
|
||||
## **Note:** This module currently only supports select(), this is limited by
|
||||
## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024
|
||||
## sockets at a time.
|
||||
##
|
||||
## Most (if not all) modules that use asyncio provide a userArg which is passed
|
||||
## on with the events. The type that you set userArg to must be inheriting from
|
||||
## ``RootObj``!
|
||||
##
|
||||
## **Note:** If you want to provide async ability to your module please do not
|
||||
## use the ``Delegate`` object, instead use ``AsyncSocket``. It is possible
|
||||
## that in the future this type's fields will not be exported therefore breaking
|
||||
## your code.
|
||||
##
|
||||
## **Warning:** The API of this module is unstable, and therefore is subject
|
||||
## to change.
|
||||
##
|
||||
## Asynchronous sockets
|
||||
## ====================
|
||||
##
|
||||
## For most purposes you do not need to worry about the ``Delegate`` type. The
|
||||
## ``AsyncSocket`` is what you are after. It's a reference to
|
||||
## the ``AsyncSocketObj`` object. This object defines events which you should
|
||||
## overwrite by your own procedures.
|
||||
##
|
||||
## For server sockets the only event you need to worry about is the ``handleAccept``
|
||||
## event, in your handleAccept proc you should call ``accept`` on the server
|
||||
## socket which will give you the client which is connecting. You should then
|
||||
## set any events that you want to use on that client and add it to your dispatcher
|
||||
## using the ``register`` procedure.
|
||||
##
|
||||
## An example ``handleAccept`` follows:
|
||||
##
|
||||
## .. code-block:: nim
|
||||
##
|
||||
## var disp = newDispatcher()
|
||||
## ...
|
||||
## proc handleAccept(s: AsyncSocket) =
|
||||
## echo("Accepted client.")
|
||||
## var client: AsyncSocket
|
||||
## new(client)
|
||||
## s.accept(client)
|
||||
## client.handleRead = ...
|
||||
## disp.register(client)
|
||||
## ...
|
||||
##
|
||||
## For client sockets you should only be interested in the ``handleRead`` and
|
||||
## ``handleConnect`` events. The former gets called whenever the socket has
|
||||
## received messages and can be read from and the latter gets called whenever
|
||||
## the socket has established a connection to a server socket; from that point
|
||||
## it can be safely written to.
|
||||
##
|
||||
## Getting a blocking client from an AsyncSocket
|
||||
## =============================================
|
||||
##
|
||||
## If you need a asynchronous server socket but you wish to process the clients
|
||||
## synchronously then you can use the ``getSocket`` converter to get
|
||||
## a ``Socket`` from the ``AsyncSocket`` object, this can then be combined
|
||||
## with ``accept`` like so:
|
||||
##
|
||||
## .. code-block:: nim
|
||||
##
|
||||
## proc handleAccept(s: AsyncSocket) =
|
||||
## var client: Socket
|
||||
## getSocket(s).accept(client)
|
||||
|
||||
{.deprecated.}
|
||||
|
||||
when defined(windows):
|
||||
from winlean import TimeVal, SocketHandle, FD_SET, FD_ZERO, TFdSet,
|
||||
FD_ISSET, select
|
||||
else:
|
||||
from posix import TimeVal, Time, Suseconds, SocketHandle, FD_SET, FD_ZERO,
|
||||
TFdSet, FD_ISSET, select
|
||||
|
||||
type
|
||||
DelegateObj* = object
|
||||
fd*: SocketHandle
|
||||
deleVal*: RootRef
|
||||
|
||||
handleRead*: proc (h: RootRef) {.nimcall, gcsafe.}
|
||||
handleWrite*: proc (h: RootRef) {.nimcall, gcsafe.}
|
||||
handleError*: proc (h: RootRef) {.nimcall, gcsafe.}
|
||||
hasDataBuffered*: proc (h: RootRef): bool {.nimcall, gcsafe.}
|
||||
|
||||
open*: bool
|
||||
task*: proc (h: RootRef) {.nimcall, gcsafe.}
|
||||
mode*: FileMode
|
||||
|
||||
Delegate* = ref DelegateObj
|
||||
|
||||
Dispatcher* = ref DispatcherObj
|
||||
DispatcherObj = object
|
||||
delegates: seq[Delegate]
|
||||
|
||||
AsyncSocket* = ref AsyncSocketObj
|
||||
AsyncSocketObj* = object of RootObj
|
||||
socket: Socket
|
||||
info: SocketStatus
|
||||
|
||||
handleRead*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
handleConnect*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
|
||||
handleAccept*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
|
||||
handleTask*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
|
||||
lineBuffer: TaintedString ## Temporary storage for ``readLine``
|
||||
sendBuffer: string ## Temporary storage for ``send``
|
||||
sslNeedAccept: bool
|
||||
proto: Protocol
|
||||
deleg: Delegate
|
||||
|
||||
SocketStatus* = enum
|
||||
SockIdle, SockConnecting, SockConnected, SockListening, SockClosed,
|
||||
SockUDPBound
|
||||
|
||||
|
||||
proc newDelegate*(): Delegate =
|
||||
## Creates a new delegate.
|
||||
new(result)
|
||||
result.handleRead = (proc (h: RootRef) = discard)
|
||||
result.handleWrite = (proc (h: RootRef) = discard)
|
||||
result.handleError = (proc (h: RootRef) = discard)
|
||||
result.hasDataBuffered = (proc (h: RootRef): bool = return false)
|
||||
result.task = (proc (h: RootRef) = discard)
|
||||
result.mode = fmRead
|
||||
|
||||
proc newAsyncSocket(): AsyncSocket =
|
||||
new(result)
|
||||
result.info = SockIdle
|
||||
|
||||
result.handleRead = (proc (s: AsyncSocket) = discard)
|
||||
result.handleWrite = nil
|
||||
result.handleConnect = (proc (s: AsyncSocket) = discard)
|
||||
result.handleAccept = (proc (s: AsyncSocket) = discard)
|
||||
result.handleTask = (proc (s: AsyncSocket) = discard)
|
||||
|
||||
result.lineBuffer = "".TaintedString
|
||||
result.sendBuffer = ""
|
||||
|
||||
proc asyncSocket*(domain: Domain = AF_INET, typ: SockType = SOCK_STREAM,
|
||||
protocol: Protocol = IPPROTO_TCP,
|
||||
buffered = true): AsyncSocket =
|
||||
## Initialises an AsyncSocket object. If a socket cannot be initialised
|
||||
## OSError is raised.
|
||||
result = newAsyncSocket()
|
||||
result.socket = socket(domain, typ, protocol, buffered)
|
||||
result.proto = protocol
|
||||
if result.socket == invalidSocket: raiseOSError(osLastError())
|
||||
result.socket.setBlocking(false)
|
||||
|
||||
proc toAsyncSocket*(sock: Socket, state: SocketStatus = SockConnected): AsyncSocket =
|
||||
## Wraps an already initialized ``Socket`` into a AsyncSocket.
|
||||
## This is useful if you want to use an already connected Socket as an
|
||||
## asynchronous AsyncSocket in asyncio's event loop.
|
||||
##
|
||||
## ``state`` may be overriden, i.e. if ``sock`` is not connected it should be
|
||||
## adjusted properly. By default it will be assumed that the socket is
|
||||
## connected. Please note this is only applicable to TCP client sockets, if
|
||||
## ``sock`` is a different type of socket ``state`` needs to be adjusted!!!
|
||||
##
|
||||
## ================ ================================================================
|
||||
## Value Meaning
|
||||
## ================ ================================================================
|
||||
## SockIdle Socket has only just been initialised, not connected or closed.
|
||||
## SockConnected Socket is connected to a server.
|
||||
## SockConnecting Socket is in the process of connecting to a server.
|
||||
## SockListening Socket is a server socket and is listening for connections.
|
||||
## SockClosed Socket has been closed.
|
||||
## SockUDPBound Socket is a UDP socket which is listening for data.
|
||||
## ================ ================================================================
|
||||
##
|
||||
## **Warning**: If ``state`` is set incorrectly the resulting ``AsyncSocket``
|
||||
## object may not work properly.
|
||||
##
|
||||
## **Note**: This will set ``sock`` to be non-blocking.
|
||||
result = newAsyncSocket()
|
||||
result.socket = sock
|
||||
result.proto = if state == SockUDPBound: IPPROTO_UDP else: IPPROTO_TCP
|
||||
result.socket.setBlocking(false)
|
||||
result.info = state
|
||||
|
||||
proc asyncSockHandleRead(h: RootRef) =
|
||||
when defined(ssl):
|
||||
if AsyncSocket(h).socket.isSSL and not
|
||||
AsyncSocket(h).socket.gotHandshake:
|
||||
return
|
||||
|
||||
if AsyncSocket(h).info != SockListening:
|
||||
if AsyncSocket(h).info != SockConnecting:
|
||||
AsyncSocket(h).handleRead(AsyncSocket(h))
|
||||
else:
|
||||
AsyncSocket(h).handleAccept(AsyncSocket(h))
|
||||
|
||||
proc close*(sock: AsyncSocket) {.gcsafe.}
|
||||
proc asyncSockHandleWrite(h: RootRef) =
|
||||
when defined(ssl):
|
||||
if AsyncSocket(h).socket.isSSL and not
|
||||
AsyncSocket(h).socket.gotHandshake:
|
||||
return
|
||||
|
||||
if AsyncSocket(h).info == SockConnecting:
|
||||
AsyncSocket(h).handleConnect(AsyncSocket(h))
|
||||
AsyncSocket(h).info = SockConnected
|
||||
# Stop receiving write events if there is no handleWrite event.
|
||||
if AsyncSocket(h).handleWrite == nil:
|
||||
AsyncSocket(h).deleg.mode = fmRead
|
||||
else:
|
||||
AsyncSocket(h).deleg.mode = fmReadWrite
|
||||
else:
|
||||
if AsyncSocket(h).sendBuffer != "":
|
||||
let sock = AsyncSocket(h)
|
||||
try:
|
||||
let bytesSent = sock.socket.sendAsync(sock.sendBuffer)
|
||||
if bytesSent == 0:
|
||||
# Apparently the socket cannot be written to. Even though select
|
||||
# just told us that it can be... This used to be an assert. Just
|
||||
# do nothing instead.
|
||||
discard
|
||||
elif bytesSent != sock.sendBuffer.len:
|
||||
sock.sendBuffer = sock.sendBuffer[bytesSent .. ^1]
|
||||
elif bytesSent == sock.sendBuffer.len:
|
||||
sock.sendBuffer = ""
|
||||
|
||||
if AsyncSocket(h).handleWrite != nil:
|
||||
AsyncSocket(h).handleWrite(AsyncSocket(h))
|
||||
except OSError:
|
||||
# Most likely the socket closed before the full buffer could be sent to it.
|
||||
sock.close() # TODO: Provide a handleError for users?
|
||||
else:
|
||||
if AsyncSocket(h).handleWrite != nil:
|
||||
AsyncSocket(h).handleWrite(AsyncSocket(h))
|
||||
else:
|
||||
AsyncSocket(h).deleg.mode = fmRead
|
||||
|
||||
when defined(ssl):
|
||||
proc asyncSockDoHandshake(h: RootRef) {.gcsafe.} =
|
||||
if AsyncSocket(h).socket.isSSL and not
|
||||
AsyncSocket(h).socket.gotHandshake:
|
||||
if AsyncSocket(h).sslNeedAccept:
|
||||
var d = ""
|
||||
let ret = AsyncSocket(h).socket.acceptAddrSSL(AsyncSocket(h).socket, d)
|
||||
assert ret != AcceptNoClient
|
||||
if ret == AcceptSuccess:
|
||||
AsyncSocket(h).info = SockConnected
|
||||
else:
|
||||
# handshake will set socket's ``sslNoHandshake`` field.
|
||||
discard AsyncSocket(h).socket.handshake()
|
||||
|
||||
|
||||
proc asyncSockTask(h: RootRef) =
|
||||
when defined(ssl):
|
||||
h.asyncSockDoHandshake()
|
||||
|
||||
AsyncSocket(h).handleTask(AsyncSocket(h))
|
||||
|
||||
proc toDelegate(sock: AsyncSocket): Delegate =
|
||||
result = newDelegate()
|
||||
result.deleVal = sock
|
||||
result.fd = getFD(sock.socket)
|
||||
# We need this to get write events, just to know when the socket connects.
|
||||
result.mode = fmReadWrite
|
||||
result.handleRead = asyncSockHandleRead
|
||||
result.handleWrite = asyncSockHandleWrite
|
||||
result.task = asyncSockTask
|
||||
# TODO: Errors?
|
||||
#result.handleError = (proc (h: PObject) = assert(false))
|
||||
|
||||
result.hasDataBuffered =
|
||||
proc (h: RootRef): bool {.nimcall.} =
|
||||
return AsyncSocket(h).socket.hasDataBuffered()
|
||||
|
||||
sock.deleg = result
|
||||
if sock.info notin {SockIdle, SockClosed}:
|
||||
sock.deleg.open = true
|
||||
else:
|
||||
sock.deleg.open = false
|
||||
|
||||
proc connect*(sock: AsyncSocket, name: string, port = Port(0),
|
||||
af: Domain = AF_INET) =
|
||||
## Begins connecting ``sock`` to ``name``:``port``.
|
||||
sock.socket.connectAsync(name, port, af)
|
||||
sock.info = SockConnecting
|
||||
if sock.deleg != nil:
|
||||
sock.deleg.open = true
|
||||
|
||||
proc close*(sock: AsyncSocket) =
|
||||
## Closes ``sock``. Terminates any current connections.
|
||||
sock.socket.close()
|
||||
sock.info = SockClosed
|
||||
if sock.deleg != nil:
|
||||
sock.deleg.open = false
|
||||
|
||||
proc bindAddr*(sock: AsyncSocket, port = Port(0), address = "") =
|
||||
## Equivalent to ``sockets.bindAddr``.
|
||||
sock.socket.bindAddr(port, address)
|
||||
if sock.proto == IPPROTO_UDP:
|
||||
sock.info = SockUDPBound
|
||||
if sock.deleg != nil:
|
||||
sock.deleg.open = true
|
||||
|
||||
proc listen*(sock: AsyncSocket) =
|
||||
## Equivalent to ``sockets.listen``.
|
||||
sock.socket.listen()
|
||||
sock.info = SockListening
|
||||
if sock.deleg != nil:
|
||||
sock.deleg.open = true
|
||||
|
||||
proc acceptAddr*(server: AsyncSocket, client: var AsyncSocket,
|
||||
address: var string) =
|
||||
## Equivalent to ``sockets.acceptAddr``. This procedure should be called in
|
||||
## a ``handleAccept`` event handler **only** once.
|
||||
##
|
||||
## **Note**: ``client`` needs to be initialised.
|
||||
assert(client != nil)
|
||||
client = newAsyncSocket()
|
||||
var c: Socket
|
||||
new(c)
|
||||
when defined(ssl):
|
||||
if server.socket.isSSL:
|
||||
var ret = server.socket.acceptAddrSSL(c, address)
|
||||
# The following shouldn't happen because when this function is called
|
||||
# it is guaranteed that there is a client waiting.
|
||||
# (This should be called in handleAccept)
|
||||
assert(ret != AcceptNoClient)
|
||||
if ret == AcceptNoHandshake:
|
||||
client.sslNeedAccept = true
|
||||
else:
|
||||
client.sslNeedAccept = false
|
||||
client.info = SockConnected
|
||||
else:
|
||||
server.socket.acceptAddr(c, address)
|
||||
client.sslNeedAccept = false
|
||||
client.info = SockConnected
|
||||
else:
|
||||
server.socket.acceptAddr(c, address)
|
||||
client.sslNeedAccept = false
|
||||
client.info = SockConnected
|
||||
|
||||
if c == invalidSocket: raiseSocketError(server.socket)
|
||||
c.setBlocking(false) # TODO: Needs to be tested.
|
||||
|
||||
# deleg.open is set in ``toDelegate``.
|
||||
|
||||
client.socket = c
|
||||
client.lineBuffer = "".TaintedString
|
||||
client.sendBuffer = ""
|
||||
client.info = SockConnected
|
||||
|
||||
proc accept*(server: AsyncSocket, client: var AsyncSocket) =
|
||||
## Equivalent to ``sockets.accept``.
|
||||
var dummyAddr = ""
|
||||
server.acceptAddr(client, dummyAddr)
|
||||
|
||||
proc acceptAddr*(server: AsyncSocket): tuple[sock: AsyncSocket,
|
||||
address: string] {.deprecated.} =
|
||||
## Equivalent to ``sockets.acceptAddr``.
|
||||
##
|
||||
## **Deprecated since version 0.9.0:** Please use the function above.
|
||||
var client = newAsyncSocket()
|
||||
var address: string = ""
|
||||
acceptAddr(server, client, address)
|
||||
return (client, address)
|
||||
|
||||
proc accept*(server: AsyncSocket): AsyncSocket {.deprecated.} =
|
||||
## Equivalent to ``sockets.accept``.
|
||||
##
|
||||
## **Deprecated since version 0.9.0:** Please use the function above.
|
||||
new(result)
|
||||
var address = ""
|
||||
server.acceptAddr(result, address)
|
||||
|
||||
proc newDispatcher*(): Dispatcher =
|
||||
new(result)
|
||||
result.delegates = @[]
|
||||
|
||||
proc register*(d: Dispatcher, deleg: Delegate) =
|
||||
## Registers delegate ``deleg`` with dispatcher ``d``.
|
||||
d.delegates.add(deleg)
|
||||
|
||||
proc register*(d: Dispatcher, sock: AsyncSocket): Delegate {.discardable.} =
|
||||
## Registers async socket ``sock`` with dispatcher ``d``.
|
||||
result = sock.toDelegate()
|
||||
d.register(result)
|
||||
|
||||
proc unregister*(d: Dispatcher, deleg: Delegate) =
|
||||
## Unregisters deleg ``deleg`` from dispatcher ``d``.
|
||||
for i in 0..len(d.delegates)-1:
|
||||
if d.delegates[i] == deleg:
|
||||
d.delegates.del(i)
|
||||
return
|
||||
raise newException(IndexError, "Could not find delegate.")
|
||||
|
||||
proc isWriteable*(s: AsyncSocket): bool =
|
||||
## Determines whether socket ``s`` is ready to be written to.
|
||||
var writeSock = @[s.socket]
|
||||
return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
|
||||
|
||||
converter getSocket*(s: AsyncSocket): Socket =
|
||||
return s.socket
|
||||
|
||||
proc isConnected*(s: AsyncSocket): bool =
|
||||
## Determines whether ``s`` is connected.
|
||||
return s.info == SockConnected
|
||||
proc isListening*(s: AsyncSocket): bool =
|
||||
## Determines whether ``s`` is listening for incoming connections.
|
||||
return s.info == SockListening
|
||||
proc isConnecting*(s: AsyncSocket): bool =
|
||||
## Determines whether ``s`` is connecting.
|
||||
return s.info == SockConnecting
|
||||
proc isClosed*(s: AsyncSocket): bool =
|
||||
## Determines whether ``s`` has been closed.
|
||||
return s.info == SockClosed
|
||||
proc isSendDataBuffered*(s: AsyncSocket): bool =
|
||||
## Determines whether ``s`` has data waiting to be sent, i.e. whether this
|
||||
## socket's sendBuffer contains data.
|
||||
return s.sendBuffer.len != 0
|
||||
|
||||
proc setHandleWrite*(s: AsyncSocket,
|
||||
handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}) =
|
||||
## Setter for the ``handleWrite`` event.
|
||||
##
|
||||
## To remove this event you should use the ``delHandleWrite`` function.
|
||||
## It is advised to use that function instead of just setting the event to
|
||||
## ``proc (s: AsyncSocket) = nil`` as that would mean that that function
|
||||
## would be called constantly.
|
||||
s.deleg.mode = fmReadWrite
|
||||
s.handleWrite = handleWrite
|
||||
|
||||
proc delHandleWrite*(s: AsyncSocket) =
|
||||
## Removes the ``handleWrite`` event handler on ``s``.
|
||||
s.handleWrite = nil
|
||||
|
||||
{.push warning[deprecated]: off.}
|
||||
proc recvLine*(s: AsyncSocket, line: var TaintedString): bool {.deprecated.} =
|
||||
## Behaves similar to ``sockets.recvLine``, however it handles non-blocking
|
||||
## sockets properly. This function guarantees that ``line`` is a full line,
|
||||
## if this function can only retrieve some data; it will save this data and
|
||||
## add it to the result when a full line is retrieved.
|
||||
##
|
||||
## Unlike ``sockets.recvLine`` this function will raise an OSError or SslError
|
||||
## exception if an error occurs.
|
||||
##
|
||||
## **Deprecated since version 0.9.2**: This function has been deprecated in
|
||||
## favour of readLine.
|
||||
setLen(line.string, 0)
|
||||
var dataReceived = "".TaintedString
|
||||
var ret = s.socket.recvLineAsync(dataReceived)
|
||||
case ret
|
||||
of RecvFullLine:
|
||||
if s.lineBuffer.len > 0:
|
||||
string(line).add(s.lineBuffer.string)
|
||||
setLen(s.lineBuffer.string, 0)
|
||||
string(line).add(dataReceived.string)
|
||||
if string(line) == "":
|
||||
line = "\c\L".TaintedString
|
||||
result = true
|
||||
of RecvPartialLine:
|
||||
string(s.lineBuffer).add(dataReceived.string)
|
||||
result = false
|
||||
of RecvDisconnected:
|
||||
result = true
|
||||
of RecvFail:
|
||||
s.raiseSocketError(async = true)
|
||||
result = false
|
||||
{.pop.}
|
||||
|
||||
proc readLine*(s: AsyncSocket, line: var TaintedString): bool =
|
||||
## Behaves similar to ``sockets.readLine``, however it handles non-blocking
|
||||
## sockets properly. This function guarantees that ``line`` is a full line,
|
||||
## if this function can only retrieve some data; it will save this data and
|
||||
## add it to the result when a full line is retrieved, when this happens
|
||||
## False will be returned. True will only be returned if a full line has been
|
||||
## retrieved or the socket has been disconnected in which case ``line`` will
|
||||
## be set to "".
|
||||
##
|
||||
## This function will raise an OSError exception when a socket error occurs.
|
||||
setLen(line.string, 0)
|
||||
var dataReceived = "".TaintedString
|
||||
var ret = s.socket.readLineAsync(dataReceived)
|
||||
case ret
|
||||
of ReadFullLine:
|
||||
if s.lineBuffer.len > 0:
|
||||
string(line).add(s.lineBuffer.string)
|
||||
setLen(s.lineBuffer.string, 0)
|
||||
string(line).add(dataReceived.string)
|
||||
if string(line) == "":
|
||||
line = "\c\L".TaintedString
|
||||
result = true
|
||||
of ReadPartialLine:
|
||||
string(s.lineBuffer).add(dataReceived.string)
|
||||
result = false
|
||||
of ReadNone:
|
||||
result = false
|
||||
of ReadDisconnected:
|
||||
result = true
|
||||
|
||||
proc send*(sock: AsyncSocket, data: string) =
|
||||
## Sends ``data`` to socket ``sock``. This is basically a nicer implementation
|
||||
## of ``sockets.sendAsync``.
|
||||
##
|
||||
## If ``data`` cannot be sent immediately it will be buffered and sent
|
||||
## when ``sock`` becomes writeable (during the ``handleWrite`` event).
|
||||
## It's possible that only a part of ``data`` will be sent immediately, while
|
||||
## the rest of it will be buffered and sent later.
|
||||
if sock.sendBuffer.len != 0:
|
||||
sock.sendBuffer.add(data)
|
||||
return
|
||||
let bytesSent = sock.socket.sendAsync(data)
|
||||
assert bytesSent >= 0
|
||||
if bytesSent == 0:
|
||||
sock.sendBuffer.add(data)
|
||||
sock.deleg.mode = fmReadWrite
|
||||
elif bytesSent != data.len:
|
||||
sock.sendBuffer.add(data[bytesSent .. ^1])
|
||||
sock.deleg.mode = fmReadWrite
|
||||
|
||||
proc timeValFromMilliseconds(timeout = 500): Timeval =
|
||||
if timeout != -1:
|
||||
var seconds = timeout div 1000
|
||||
when defined(posix):
|
||||
result.tv_sec = seconds.Time
|
||||
result.tv_usec = ((timeout - seconds * 1000) * 1000).Suseconds
|
||||
else:
|
||||
result.tv_sec = seconds.int32
|
||||
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
|
||||
|
||||
proc createFdSet(fd: var TFdSet, s: seq[Delegate], m: var int) =
|
||||
FD_ZERO(fd)
|
||||
for i in items(s):
|
||||
m = max(m, int(i.fd))
|
||||
FD_SET(i.fd, fd)
|
||||
|
||||
proc pruneSocketSet(s: var seq[Delegate], fd: var TFdSet) =
|
||||
var i = 0
|
||||
var L = s.len
|
||||
while i < L:
|
||||
if FD_ISSET(s[i].fd, fd) != 0'i32:
|
||||
s[i] = s[L-1]
|
||||
dec(L)
|
||||
else:
|
||||
inc(i)
|
||||
setLen(s, L)
|
||||
|
||||
proc select(readfds, writefds, exceptfds: var seq[Delegate],
|
||||
timeout = 500): int =
|
||||
var tv {.noInit.}: Timeval = timeValFromMilliseconds(timeout)
|
||||
|
||||
var rd, wr, ex: TFdSet
|
||||
var m = 0
|
||||
createFdSet(rd, readfds, m)
|
||||
createFdSet(wr, writefds, m)
|
||||
createFdSet(ex, exceptfds, m)
|
||||
|
||||
if timeout != -1:
|
||||
result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), addr(tv)))
|
||||
else:
|
||||
result = int(select(cint(m+1), addr(rd), addr(wr), addr(ex), nil))
|
||||
|
||||
pruneSocketSet(readfds, (rd))
|
||||
pruneSocketSet(writefds, (wr))
|
||||
pruneSocketSet(exceptfds, (ex))
|
||||
|
||||
proc poll*(d: Dispatcher, timeout: int = 500): bool =
|
||||
## This function checks for events on all the delegates in the `PDispatcher`.
|
||||
## It then proceeds to call the correct event handler.
|
||||
##
|
||||
## This function returns ``True`` if there are file descriptors that are still
|
||||
## open, otherwise ``False``. File descriptors that have been
|
||||
## closed are immediately removed from the dispatcher automatically.
|
||||
##
|
||||
## **Note:** Each delegate has a task associated with it. This gets called
|
||||
## after each select() call, if you set timeout to ``-1`` the tasks will
|
||||
## only be executed after one or more file descriptors becomes readable or
|
||||
## writeable.
|
||||
result = true
|
||||
var readDg, writeDg, errorDg: seq[Delegate] = @[]
|
||||
var len = d.delegates.len
|
||||
var dc = 0
|
||||
|
||||
while dc < len:
|
||||
let deleg = d.delegates[dc]
|
||||
if (deleg.mode != fmWrite or deleg.mode != fmAppend) and deleg.open:
|
||||
readDg.add(deleg)
|
||||
if (deleg.mode != fmRead) and deleg.open:
|
||||
writeDg.add(deleg)
|
||||
if deleg.open:
|
||||
errorDg.add(deleg)
|
||||
inc dc
|
||||
else:
|
||||
# File/socket has been closed. Remove it from dispatcher.
|
||||
d.delegates[dc] = d.delegates[len-1]
|
||||
dec len
|
||||
|
||||
d.delegates.setLen(len)
|
||||
|
||||
var hasDataBufferedCount = 0
|
||||
for d in d.delegates:
|
||||
if d.hasDataBuffered(d.deleVal):
|
||||
hasDataBufferedCount.inc()
|
||||
d.handleRead(d.deleVal)
|
||||
if hasDataBufferedCount > 0: return true
|
||||
|
||||
if readDg.len() == 0 and writeDg.len() == 0:
|
||||
## TODO: Perhaps this shouldn't return if errorDg has something?
|
||||
return false
|
||||
|
||||
if select(readDg, writeDg, errorDg, timeout) != 0:
|
||||
for i in 0..len(d.delegates)-1:
|
||||
if i > len(d.delegates)-1: break # One delegate might've been removed.
|
||||
let deleg = d.delegates[i]
|
||||
if not deleg.open: continue # This delegate might've been closed.
|
||||
if (deleg.mode != fmWrite or deleg.mode != fmAppend) and
|
||||
deleg notin readDg:
|
||||
deleg.handleRead(deleg.deleVal)
|
||||
if (deleg.mode != fmRead) and deleg notin writeDg:
|
||||
deleg.handleWrite(deleg.deleVal)
|
||||
if deleg notin errorDg:
|
||||
deleg.handleError(deleg.deleVal)
|
||||
|
||||
# Execute tasks
|
||||
for i in items(d.delegates):
|
||||
i.task(i.deleVal)
|
||||
|
||||
proc len*(disp: Dispatcher): int =
|
||||
## Retrieves the amount of delegates in ``disp``.
|
||||
return disp.delegates.len
|
||||
|
||||
when not defined(testing) and isMainModule:
|
||||
|
||||
proc testConnect(s: AsyncSocket, no: int) =
|
||||
echo("Connected! " & $no)
|
||||
|
||||
proc testRead(s: AsyncSocket, no: int) =
|
||||
echo("Reading! " & $no)
|
||||
var data = ""
|
||||
if not s.readLine(data): return
|
||||
if data == "":
|
||||
echo("Closing connection. " & $no)
|
||||
s.close()
|
||||
echo(data)
|
||||
echo("Finished reading! " & $no)
|
||||
|
||||
proc testAccept(s: AsyncSocket, disp: Dispatcher, no: int) =
|
||||
echo("Accepting client! " & $no)
|
||||
var client: AsyncSocket
|
||||
new(client)
|
||||
var address = ""
|
||||
s.acceptAddr(client, address)
|
||||
echo("Accepted ", address)
|
||||
client.handleRead =
|
||||
proc (s: AsyncSocket) =
|
||||
testRead(s, 2)
|
||||
disp.register(client)
|
||||
|
||||
proc main =
|
||||
var d = newDispatcher()
|
||||
|
||||
var s = asyncSocket()
|
||||
s.connect("amber.tenthbit.net", Port(6667))
|
||||
s.handleConnect =
|
||||
proc (s: AsyncSocket) =
|
||||
testConnect(s, 1)
|
||||
s.handleRead =
|
||||
proc (s: AsyncSocket) =
|
||||
testRead(s, 1)
|
||||
d.register(s)
|
||||
|
||||
var server = asyncSocket()
|
||||
server.handleAccept =
|
||||
proc (s: AsyncSocket) =
|
||||
testAccept(s, d, 78)
|
||||
server.bindAddr(Port(5555))
|
||||
server.listen()
|
||||
d.register(server)
|
||||
|
||||
while d.poll(-1): discard
|
||||
main()
|
||||
@@ -1,669 +0,0 @@
|
||||
#
|
||||
#
|
||||
# Nim's Runtime Library
|
||||
# (c) Copyright 2015 Dominik Picheta
|
||||
# See the file "copying.txt", included in this
|
||||
# distribution, for details about the copyright.
|
||||
#
|
||||
|
||||
include "system/inclrtl"
|
||||
|
||||
import sockets, strutils, parseutils, times, os, asyncio
|
||||
|
||||
from asyncnet import nil
|
||||
from nativesockets import nil
|
||||
from asyncdispatch import Future
|
||||
## **Note**: This module is deprecated since version 0.11.3.
|
||||
## You should use the async version of this module
|
||||
## `asyncftpclient <asyncftpclient.html>`_.
|
||||
##
|
||||
## ----
|
||||
##
|
||||
## This module **partially** implements an FTP client as specified
|
||||
## by `RFC 959 <http://tools.ietf.org/html/rfc959>`_.
|
||||
##
|
||||
## This module provides both a synchronous and asynchronous implementation.
|
||||
## The asynchronous implementation requires you to use the ``asyncFTPClient``
|
||||
## function. You are then required to register the ``AsyncFTPClient`` with a
|
||||
## asyncio dispatcher using the ``register`` function. Take a look at the
|
||||
## asyncio module documentation for more information.
|
||||
##
|
||||
## **Note**: The asynchronous implementation is only asynchronous for long
|
||||
## file transfers, calls to functions which use the command socket will block.
|
||||
##
|
||||
## Here is some example usage of this module:
|
||||
##
|
||||
## .. code-block:: Nim
|
||||
## var ftp = ftpClient("example.org", user = "user", pass = "pass")
|
||||
## ftp.connect()
|
||||
## ftp.retrFile("file.ext", "file.ext")
|
||||
##
|
||||
## **Warning:** The API of this module is unstable, and therefore is subject
|
||||
## to change.
|
||||
|
||||
{.deprecated.}
|
||||
|
||||
type
|
||||
FtpBase*[SockType] = ref FtpBaseObj[SockType]
|
||||
FtpBaseObj*[SockType] = object
|
||||
csock*: SockType
|
||||
dsock*: SockType
|
||||
when SockType is asyncio.AsyncSocket:
|
||||
handleEvent*: proc (ftp: AsyncFTPClient, ev: FTPEvent){.closure,gcsafe.}
|
||||
disp: Dispatcher
|
||||
asyncDSockID: Delegate
|
||||
user*, pass*: string
|
||||
address*: string
|
||||
when SockType is asyncnet.AsyncSocket:
|
||||
port*: nativesockets.Port
|
||||
else:
|
||||
port*: Port
|
||||
|
||||
jobInProgress*: bool
|
||||
job*: FTPJob[SockType]
|
||||
|
||||
dsockConnected*: bool
|
||||
|
||||
FTPJobType* = enum
|
||||
JRetrText, JRetr, JStore
|
||||
|
||||
FtpJob[T] = ref FtpJobObj[T]
|
||||
FTPJobObj[T] = object
|
||||
prc: proc (ftp: FTPBase[T], async: bool): bool {.nimcall, gcsafe.}
|
||||
case typ*: FTPJobType
|
||||
of JRetrText:
|
||||
lines: string
|
||||
of JRetr, JStore:
|
||||
file: File
|
||||
filename: string
|
||||
total: BiggestInt # In bytes.
|
||||
progress: BiggestInt # In bytes.
|
||||
oneSecond: BiggestInt # Bytes transferred in one second.
|
||||
lastProgressReport: float # Time
|
||||
toStore: string # Data left to upload (Only used with async)
|
||||
|
||||
FtpClientObj* = FtpBaseObj[Socket]
|
||||
FtpClient* = ref FtpClientObj
|
||||
|
||||
AsyncFtpClient* = ref AsyncFtpClientObj ## Async alternative to TFTPClient.
|
||||
AsyncFtpClientObj* = FtpBaseObj[asyncio.AsyncSocket]
|
||||
|
||||
FTPEventType* = enum
|
||||
EvTransferProgress, EvLines, EvRetr, EvStore
|
||||
|
||||
FTPEvent* = object ## Event
|
||||
filename*: string
|
||||
case typ*: FTPEventType
|
||||
of EvLines:
|
||||
lines*: string ## Lines that have been transferred.
|
||||
of EvRetr, EvStore: ## Retr/Store operation finished.
|
||||
nil
|
||||
of EvTransferProgress:
|
||||
bytesTotal*: BiggestInt ## Bytes total.
|
||||
bytesFinished*: BiggestInt ## Bytes transferred.
|
||||
speed*: BiggestInt ## Speed in bytes/s
|
||||
currentJob*: FTPJobType ## The current job being performed.
|
||||
|
||||
ReplyError* = object of IOError
|
||||
FTPError* = object of IOError
|
||||
|
||||
|
||||
const multiLineLimit = 10000
|
||||
|
||||
proc ftpClient*(address: string, port = Port(21),
|
||||
user, pass = ""): FtpClient =
|
||||
## Create a ``FtpClient`` object.
|
||||
new(result)
|
||||
result.user = user
|
||||
result.pass = pass
|
||||
result.address = address
|
||||
result.port = port
|
||||
|
||||
result.dsockConnected = false
|
||||
result.csock = socket()
|
||||
if result.csock == invalidSocket: raiseOSError(osLastError())
|
||||
|
||||
template blockingOperation(sock: Socket, body: untyped) =
|
||||
body
|
||||
|
||||
template blockingOperation(sock: asyncio.AsyncSocket, body: untyped) =
|
||||
sock.setBlocking(true)
|
||||
body
|
||||
sock.setBlocking(false)
|
||||
|
||||
proc expectReply[T](ftp: FtpBase[T]): TaintedString =
|
||||
result = TaintedString""
|
||||
blockingOperation(ftp.csock):
|
||||
when T is Socket:
|
||||
ftp.csock.readLine(result)
|
||||
else:
|
||||
discard ftp.csock.readLine(result)
|
||||
var count = 0
|
||||
while result[3] == '-':
|
||||
## Multi-line reply.
|
||||
var line = TaintedString""
|
||||
when T is Socket:
|
||||
ftp.csock.readLine(line)
|
||||
else:
|
||||
discard ftp.csock.readLine(line)
|
||||
result.add("\n" & line)
|
||||
count.inc()
|
||||
if count >= multiLineLimit:
|
||||
raise newException(ReplyError, "Reached maximum multi-line reply count.")
|
||||
|
||||
proc send*[T](ftp: FtpBase[T], m: string): TaintedString =
|
||||
## Send a message to the server, and wait for a primary reply.
|
||||
## ``\c\L`` is added for you.
|
||||
##
|
||||
## **Note:** The server may return multiple lines of coded replies.
|
||||
blockingOperation(ftp.csock):
|
||||
ftp.csock.send(m & "\c\L")
|
||||
return ftp.expectReply()
|
||||
|
||||
proc assertReply(received: TaintedString, expected: string) =
|
||||
if not received.string.startsWith(expected):
|
||||
raise newException(ReplyError,
|
||||
"Expected reply '$1' got: $2" % [
|
||||
expected, received.string])
|
||||
|
||||
proc assertReply(received: TaintedString, expected: varargs[string]) =
|
||||
for i in items(expected):
|
||||
if received.string.startsWith(i): return
|
||||
raise newException(ReplyError,
|
||||
"Expected reply '$1' got: $2" %
|
||||
[expected.join("' or '"), received.string])
|
||||
|
||||
proc createJob[T](ftp: FtpBase[T],
|
||||
prc: proc (ftp: FtpBase[T], async: bool): bool {.
|
||||
nimcall,gcsafe.},
|
||||
cmd: FTPJobType) =
|
||||
if ftp.jobInProgress:
|
||||
raise newException(FTPError, "Unable to do two jobs at once.")
|
||||
ftp.jobInProgress = true
|
||||
new(ftp.job)
|
||||
ftp.job.prc = prc
|
||||
ftp.job.typ = cmd
|
||||
case cmd
|
||||
of JRetrText:
|
||||
ftp.job.lines = ""
|
||||
of JRetr, JStore:
|
||||
ftp.job.toStore = ""
|
||||
|
||||
proc deleteJob[T](ftp: FtpBase[T]) =
|
||||
assert ftp.jobInProgress
|
||||
ftp.jobInProgress = false
|
||||
case ftp.job.typ
|
||||
of JRetrText:
|
||||
ftp.job.lines = ""
|
||||
of JRetr, JStore:
|
||||
ftp.job.file.close()
|
||||
ftp.dsock.close()
|
||||
|
||||
proc handleTask(s: AsyncSocket, ftp: AsyncFTPClient) =
|
||||
if ftp.jobInProgress:
|
||||
if ftp.job.typ in {JRetr, JStore}:
|
||||
if epochTime() - ftp.job.lastProgressReport >= 1.0:
|
||||
var r: FTPEvent
|
||||
ftp.job.lastProgressReport = epochTime()
|
||||
r.typ = EvTransferProgress
|
||||
r.bytesTotal = ftp.job.total
|
||||
r.bytesFinished = ftp.job.progress
|
||||
r.speed = ftp.job.oneSecond
|
||||
r.filename = ftp.job.filename
|
||||
r.currentJob = ftp.job.typ
|
||||
ftp.job.oneSecond = 0
|
||||
ftp.handleEvent(ftp, r)
|
||||
|
||||
proc handleWrite(s: AsyncSocket, ftp: AsyncFTPClient) =
|
||||
if ftp.jobInProgress:
|
||||
if ftp.job.typ == JStore:
|
||||
assert (not ftp.job.prc(ftp, true))
|
||||
|
||||
proc handleConnect(s: AsyncSocket, ftp: AsyncFTPClient) =
|
||||
ftp.dsockConnected = true
|
||||
assert(ftp.jobInProgress)
|
||||
if ftp.job.typ == JStore:
|
||||
s.setHandleWrite(proc (s: AsyncSocket) = handleWrite(s, ftp))
|
||||
else:
|
||||
s.delHandleWrite()
|
||||
|
||||
proc handleRead(s: AsyncSocket, ftp: AsyncFTPClient) =
|
||||
assert ftp.jobInProgress
|
||||
assert ftp.job.typ != JStore
|
||||
# This can never return true, because it shouldn't check for code
|
||||
# 226 from csock.
|
||||
assert(not ftp.job.prc(ftp, true))
|
||||
|
||||
proc pasv[T](ftp: FtpBase[T]) =
|
||||
## Negotiate a data connection.
|
||||
when T is Socket:
|
||||
ftp.dsock = socket()
|
||||
if ftp.dsock == invalidSocket: raiseOSError(osLastError())
|
||||
elif T is AsyncSocket:
|
||||
ftp.dsock = asyncSocket()
|
||||
ftp.dsock.handleRead =
|
||||
proc (s: AsyncSocket) =
|
||||
handleRead(s, ftp)
|
||||
ftp.dsock.handleConnect =
|
||||
proc (s: AsyncSocket) =
|
||||
handleConnect(s, ftp)
|
||||
ftp.dsock.handleTask =
|
||||
proc (s: AsyncSocket) =
|
||||
handleTask(s, ftp)
|
||||
ftp.disp.register(ftp.dsock)
|
||||
else:
|
||||
{.fatal: "Incorrect socket instantiation".}
|
||||
|
||||
var pasvMsg = ftp.send("PASV").string.strip.TaintedString
|
||||
assertReply(pasvMsg, "227")
|
||||
var betweenParens = captureBetween(pasvMsg.string, '(', ')')
|
||||
var nums = betweenParens.split(',')
|
||||
var ip = nums[0.. ^3]
|
||||
var port = nums[^2.. ^1]
|
||||
var properPort = port[0].parseInt()*256+port[1].parseInt()
|
||||
ftp.dsock.connect(ip.join("."), Port(properPort.toU16))
|
||||
when T is AsyncSocket:
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
ftp.dsockConnected = true
|
||||
|
||||
proc normalizePathSep(path: string): string =
|
||||
return replace(path, '\\', '/')
|
||||
|
||||
proc connect*[T](ftp: FtpBase[T]) =
|
||||
## Connect to the FTP server specified by ``ftp``.
|
||||
when T is AsyncSocket:
|
||||
blockingOperation(ftp.csock):
|
||||
ftp.csock.connect(ftp.address, ftp.port)
|
||||
elif T is Socket:
|
||||
ftp.csock.connect(ftp.address, ftp.port)
|
||||
else:
|
||||
{.fatal: "Incorrect socket instantiation".}
|
||||
|
||||
var reply = ftp.expectReply()
|
||||
if reply.startsWith("120"):
|
||||
# 120 Service ready in nnn minutes.
|
||||
# We wait until we receive 220.
|
||||
reply = ftp.expectReply()
|
||||
|
||||
# Handle 220 messages from the server
|
||||
assertReply ftp.expectReply(), "220"
|
||||
|
||||
if ftp.user != "":
|
||||
assertReply(ftp.send("USER " & ftp.user), "230", "331")
|
||||
|
||||
if ftp.pass != "":
|
||||
assertReply ftp.send("PASS " & ftp.pass), "230"
|
||||
|
||||
proc pwd*[T](ftp: FtpBase[T]): string =
|
||||
## Returns the current working directory.
|
||||
var wd = ftp.send("PWD")
|
||||
assertReply wd, "257"
|
||||
return wd.string.captureBetween('"') # "
|
||||
|
||||
proc cd*[T](ftp: FtpBase[T], dir: string) =
|
||||
## Changes the current directory on the remote FTP server to ``dir``.
|
||||
assertReply ftp.send("CWD " & dir.normalizePathSep), "250"
|
||||
|
||||
proc cdup*[T](ftp: FtpBase[T]) =
|
||||
## Changes the current directory to the parent of the current directory.
|
||||
assertReply ftp.send("CDUP"), "200"
|
||||
|
||||
proc getLines[T](ftp: FtpBase[T], async: bool = false): bool =
|
||||
## Downloads text data in ASCII mode
|
||||
## Returns true if the download is complete.
|
||||
## It doesn't if `async` is true, because it doesn't check for 226 then.
|
||||
if ftp.dsockConnected:
|
||||
var r = TaintedString""
|
||||
when T is AsyncSocket:
|
||||
if ftp.asyncDSock.readLine(r):
|
||||
if r.string == "":
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
ftp.job.lines.add(r.string & "\n")
|
||||
elif T is Socket:
|
||||
assert(not async)
|
||||
ftp.dsock.readLine(r)
|
||||
if r.string == "":
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
ftp.job.lines.add(r.string & "\n")
|
||||
else:
|
||||
{.fatal: "Incorrect socket instantiation".}
|
||||
|
||||
if not async:
|
||||
var readSocks: seq[Socket] = @[ftp.csock]
|
||||
# This is only needed here. Asyncio gets this socket...
|
||||
blockingOperation(ftp.csock):
|
||||
if readSocks.select(1) != 0 and ftp.csock in readSocks:
|
||||
assertReply ftp.expectReply(), "226"
|
||||
return true
|
||||
|
||||
proc listDirs*[T](ftp: FtpBase[T], dir: string = "",
|
||||
async = false): seq[string] =
|
||||
## Returns a list of filenames in the given directory. If ``dir`` is "",
|
||||
## the current directory is used. If ``async`` is true, this
|
||||
## function will return immediately and it will be your job to
|
||||
## use asyncio's ``poll`` to progress this operation.
|
||||
|
||||
ftp.createJob(getLines[T], JRetrText)
|
||||
ftp.pasv()
|
||||
|
||||
assertReply ftp.send("NLST " & dir.normalizePathSep), ["125", "150"]
|
||||
|
||||
if not async:
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
result = splitLines(ftp.job.lines)
|
||||
ftp.deleteJob()
|
||||
else: return @[]
|
||||
|
||||
proc fileExists*(ftp: FtpClient, file: string): bool {.deprecated.} =
|
||||
## **Deprecated since version 0.9.0:** Please use ``existsFile``.
|
||||
##
|
||||
## Determines whether ``file`` exists.
|
||||
##
|
||||
## Warning: This function may block. Especially on directories with many
|
||||
## files, because a full list of file names must be retrieved.
|
||||
var files = ftp.listDirs()
|
||||
for f in items(files):
|
||||
if f.normalizePathSep == file.normalizePathSep: return true
|
||||
|
||||
proc existsFile*(ftp: FtpClient, file: string): bool =
|
||||
## Determines whether ``file`` exists.
|
||||
##
|
||||
## Warning: This function may block. Especially on directories with many
|
||||
## files, because a full list of file names must be retrieved.
|
||||
var files = ftp.listDirs()
|
||||
for f in items(files):
|
||||
if f.normalizePathSep == file.normalizePathSep: return true
|
||||
|
||||
proc createDir*[T](ftp: FtpBase[T], dir: string, recursive: bool = false) =
|
||||
## Creates a directory ``dir``. If ``recursive`` is true, the topmost
|
||||
## subdirectory of ``dir`` will be created first, following the secondmost...
|
||||
## etc. this allows you to give a full path as the ``dir`` without worrying
|
||||
## about subdirectories not existing.
|
||||
if not recursive:
|
||||
assertReply ftp.send("MKD " & dir.normalizePathSep), "257"
|
||||
else:
|
||||
var reply = TaintedString""
|
||||
var previousDirs = ""
|
||||
for p in split(dir, {os.DirSep, os.AltSep}):
|
||||
if p != "":
|
||||
previousDirs.add(p)
|
||||
reply = ftp.send("MKD " & previousDirs)
|
||||
previousDirs.add('/')
|
||||
assertReply reply, "257"
|
||||
|
||||
proc chmod*[T](ftp: FtpBase[T], path: string,
|
||||
permissions: set[FilePermission]) =
|
||||
## Changes permission of ``path`` to ``permissions``.
|
||||
var userOctal = 0
|
||||
var groupOctal = 0
|
||||
var otherOctal = 0
|
||||
for i in items(permissions):
|
||||
case i
|
||||
of fpUserExec: userOctal.inc(1)
|
||||
of fpUserWrite: userOctal.inc(2)
|
||||
of fpUserRead: userOctal.inc(4)
|
||||
of fpGroupExec: groupOctal.inc(1)
|
||||
of fpGroupWrite: groupOctal.inc(2)
|
||||
of fpGroupRead: groupOctal.inc(4)
|
||||
of fpOthersExec: otherOctal.inc(1)
|
||||
of fpOthersWrite: otherOctal.inc(2)
|
||||
of fpOthersRead: otherOctal.inc(4)
|
||||
|
||||
var perm = $userOctal & $groupOctal & $otherOctal
|
||||
assertReply ftp.send("SITE CHMOD " & perm &
|
||||
" " & path.normalizePathSep), "200"
|
||||
|
||||
proc list*[T](ftp: FtpBase[T], dir: string = "", async = false): string =
|
||||
## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current
|
||||
## working directory. If ``async`` is true, this function will return
|
||||
## immediately and it will be your job to call asyncio's
|
||||
## ``poll`` to progress this operation.
|
||||
ftp.createJob(getLines[T], JRetrText)
|
||||
ftp.pasv()
|
||||
|
||||
assertReply(ftp.send("LIST" & " " & dir.normalizePathSep), ["125", "150"])
|
||||
|
||||
if not async:
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
result = ftp.job.lines
|
||||
ftp.deleteJob()
|
||||
else:
|
||||
return ""
|
||||
|
||||
proc retrText*[T](ftp: FtpBase[T], file: string, async = false): string =
|
||||
## Retrieves ``file``. File must be ASCII text.
|
||||
## If ``async`` is true, this function will return immediately and
|
||||
## it will be your job to call asyncio's ``poll`` to progress this operation.
|
||||
ftp.createJob(getLines[T], JRetrText)
|
||||
ftp.pasv()
|
||||
assertReply ftp.send("RETR " & file.normalizePathSep), ["125", "150"]
|
||||
|
||||
if not async:
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
result = ftp.job.lines
|
||||
ftp.deleteJob()
|
||||
else:
|
||||
return ""
|
||||
|
||||
proc getFile[T](ftp: FtpBase[T], async = false): bool =
|
||||
if ftp.dsockConnected:
|
||||
var r = "".TaintedString
|
||||
var bytesRead = 0
|
||||
var returned = false
|
||||
if async:
|
||||
when T is Socket:
|
||||
raise newException(FTPError, "FTPClient must be async.")
|
||||
else:
|
||||
bytesRead = ftp.dsock.recvAsync(r, BufferSize)
|
||||
returned = bytesRead != -1
|
||||
else:
|
||||
bytesRead = ftp.dsock.recv(r, BufferSize)
|
||||
returned = true
|
||||
let r2 = r.string
|
||||
if r2 != "":
|
||||
ftp.job.progress.inc(r2.len)
|
||||
ftp.job.oneSecond.inc(r2.len)
|
||||
ftp.job.file.write(r2)
|
||||
elif returned and r2 == "":
|
||||
ftp.dsockConnected = false
|
||||
|
||||
when T is Socket:
|
||||
if not async:
|
||||
var readSocks: seq[Socket] = @[ftp.csock]
|
||||
blockingOperation(ftp.csock):
|
||||
if readSocks.select(1) != 0 and ftp.csock in readSocks:
|
||||
assertReply ftp.expectReply(), "226"
|
||||
return true
|
||||
|
||||
proc retrFile*[T](ftp: FtpBase[T], file, dest: string, async = false) =
|
||||
## Downloads ``file`` and saves it to ``dest``. Usage of this function
|
||||
## asynchronously is recommended to view the progress of the download.
|
||||
## The ``EvRetr`` event is passed to the specified ``handleEvent`` function
|
||||
## when the download is finished, and the ``filename`` field will be equal
|
||||
## to ``file``.
|
||||
ftp.createJob(getFile[T], JRetr)
|
||||
ftp.job.file = open(dest, mode = fmWrite)
|
||||
ftp.pasv()
|
||||
var reply = ftp.send("RETR " & file.normalizePathSep)
|
||||
assertReply reply, ["125", "150"]
|
||||
if {'(', ')'} notin reply.string:
|
||||
raise newException(ReplyError, "Reply has no file size.")
|
||||
var fileSize: BiggestInt
|
||||
if reply.string.captureBetween('(', ')').parseBiggestInt(fileSize) == 0:
|
||||
raise newException(ReplyError, "Reply has no file size.")
|
||||
|
||||
ftp.job.total = fileSize
|
||||
ftp.job.lastProgressReport = epochTime()
|
||||
ftp.job.filename = file.normalizePathSep
|
||||
|
||||
if not async:
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
ftp.deleteJob()
|
||||
|
||||
proc doUpload[T](ftp: FtpBase[T], async = false): bool =
|
||||
if ftp.dsockConnected:
|
||||
if ftp.job.toStore.len() > 0:
|
||||
assert(async)
|
||||
let bytesSent = ftp.dsock.sendAsync(ftp.job.toStore)
|
||||
if bytesSent == ftp.job.toStore.len:
|
||||
ftp.job.toStore = ""
|
||||
elif bytesSent != ftp.job.toStore.len and bytesSent != 0:
|
||||
ftp.job.toStore = ftp.job.toStore[bytesSent .. ^1]
|
||||
ftp.job.progress.inc(bytesSent)
|
||||
ftp.job.oneSecond.inc(bytesSent)
|
||||
else:
|
||||
var s = newStringOfCap(4000)
|
||||
var len = ftp.job.file.readBuffer(addr(s[0]), 4000)
|
||||
setLen(s, len)
|
||||
if len == 0:
|
||||
# File finished uploading.
|
||||
ftp.dsock.close()
|
||||
ftp.dsockConnected = false
|
||||
|
||||
if not async:
|
||||
assertReply ftp.expectReply(), "226"
|
||||
return true
|
||||
return false
|
||||
|
||||
if not async:
|
||||
ftp.dsock.send(s)
|
||||
else:
|
||||
let bytesSent = ftp.dsock.sendAsync(s)
|
||||
if bytesSent == 0:
|
||||
ftp.job.toStore.add(s)
|
||||
elif bytesSent != s.len:
|
||||
ftp.job.toStore.add(s[bytesSent .. ^1])
|
||||
len = bytesSent
|
||||
|
||||
ftp.job.progress.inc(len)
|
||||
ftp.job.oneSecond.inc(len)
|
||||
|
||||
proc store*[T](ftp: FtpBase[T], file, dest: string, async = false) =
|
||||
## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this
|
||||
## function asynchronously is recommended to view the progress of
|
||||
## the download.
|
||||
## The ``EvStore`` event is passed to the specified ``handleEvent`` function
|
||||
## when the upload is finished, and the ``filename`` field will be
|
||||
## equal to ``file``.
|
||||
ftp.createJob(doUpload[T], JStore)
|
||||
ftp.job.file = open(file)
|
||||
ftp.job.total = ftp.job.file.getFileSize()
|
||||
ftp.job.lastProgressReport = epochTime()
|
||||
ftp.job.filename = file
|
||||
ftp.pasv()
|
||||
|
||||
assertReply ftp.send("STOR " & dest.normalizePathSep), ["125", "150"]
|
||||
|
||||
if not async:
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
ftp.deleteJob()
|
||||
|
||||
proc close*[T](ftp: FtpBase[T]) =
|
||||
## Terminates the connection to the server.
|
||||
assertReply ftp.send("QUIT"), "221"
|
||||
if ftp.jobInProgress: ftp.deleteJob()
|
||||
ftp.csock.close()
|
||||
ftp.dsock.close()
|
||||
|
||||
proc csockHandleRead(s: AsyncSocket, ftp: AsyncFTPClient) =
|
||||
if ftp.jobInProgress:
|
||||
assertReply ftp.expectReply(), "226" # Make sure the transfer completed.
|
||||
var r: FTPEvent
|
||||
case ftp.job.typ
|
||||
of JRetrText:
|
||||
r.typ = EvLines
|
||||
r.lines = ftp.job.lines
|
||||
of JRetr:
|
||||
r.typ = EvRetr
|
||||
r.filename = ftp.job.filename
|
||||
if ftp.job.progress != ftp.job.total:
|
||||
raise newException(FTPError, "Didn't download full file.")
|
||||
of JStore:
|
||||
r.typ = EvStore
|
||||
r.filename = ftp.job.filename
|
||||
if ftp.job.progress != ftp.job.total:
|
||||
raise newException(FTPError, "Didn't upload full file.")
|
||||
ftp.deleteJob()
|
||||
|
||||
ftp.handleEvent(ftp, r)
|
||||
|
||||
proc asyncFTPClient*(address: string, port = Port(21),
|
||||
user, pass = "",
|
||||
handleEvent: proc (ftp: AsyncFTPClient, ev: FTPEvent) {.closure,gcsafe.} =
|
||||
(proc (ftp: AsyncFTPClient, ev: FTPEvent) = discard)): AsyncFTPClient =
|
||||
## Create a ``AsyncFTPClient`` object.
|
||||
##
|
||||
## Use this if you want to use asyncio's dispatcher.
|
||||
var dres: AsyncFtpClient
|
||||
new(dres)
|
||||
dres.user = user
|
||||
dres.pass = pass
|
||||
dres.address = address
|
||||
dres.port = port
|
||||
dres.dsockConnected = false
|
||||
dres.handleEvent = handleEvent
|
||||
dres.csock = asyncSocket()
|
||||
dres.csock.handleRead =
|
||||
proc (s: AsyncSocket) =
|
||||
csockHandleRead(s, dres)
|
||||
result = dres
|
||||
|
||||
proc register*(d: Dispatcher, ftp: AsyncFTPClient): Delegate {.discardable.} =
|
||||
## Registers ``ftp`` with dispatcher ``d``.
|
||||
ftp.disp = d
|
||||
return ftp.disp.register(ftp.csock)
|
||||
|
||||
when not defined(testing) and isMainModule:
|
||||
proc main =
|
||||
var d = newDispatcher()
|
||||
let hev =
|
||||
proc (ftp: AsyncFTPClient, event: FTPEvent) =
|
||||
case event.typ
|
||||
of EvStore:
|
||||
echo("Upload finished!")
|
||||
ftp.retrFile("payload.jpg", "payload2.jpg", async = true)
|
||||
of EvTransferProgress:
|
||||
var time: int64 = -1
|
||||
if event.speed != 0:
|
||||
time = (event.bytesTotal - event.bytesFinished) div event.speed
|
||||
echo(event.currentJob)
|
||||
echo(event.speed div 1000, " kb/s. - ",
|
||||
event.bytesFinished, "/", event.bytesTotal,
|
||||
" - ", time, " seconds")
|
||||
echo(d.len)
|
||||
of EvRetr:
|
||||
echo("Download finished!")
|
||||
ftp.close()
|
||||
echo d.len
|
||||
else: assert(false)
|
||||
var ftp = asyncFTPClient("example.com", user = "foo", pass = "bar", handleEvent = hev)
|
||||
|
||||
d.register(ftp)
|
||||
d.len.echo()
|
||||
ftp.connect()
|
||||
echo "connected"
|
||||
ftp.store("payload.jpg", "payload.jpg", async = true)
|
||||
d.len.echo()
|
||||
echo "uploading..."
|
||||
while true:
|
||||
if not d.poll(): break
|
||||
main()
|
||||
|
||||
when not defined(testing) and isMainModule:
|
||||
var ftp = ftpClient("example.com", user = "foo", pass = "bar")
|
||||
ftp.connect()
|
||||
echo ftp.pwd()
|
||||
echo ftp.list()
|
||||
echo("uploading")
|
||||
ftp.store("payload.jpg", "payload.jpg", async = false)
|
||||
|
||||
echo("Upload complete")
|
||||
ftp.retrFile("payload.jpg", "payload2.jpg", async = false)
|
||||
|
||||
echo("Download complete")
|
||||
sleep(5000)
|
||||
ftp.close()
|
||||
sleep(200)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1674,7 +1674,7 @@ template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
|
||||
curFd = fdPerDomain[ord(domain)]
|
||||
if curFd == osInvalidSocket.AsyncFD:
|
||||
try:
|
||||
curFd = newAsyncNativeSocket(domain, sockType, protocol)
|
||||
curFd = createAsyncNativeSocket(domain, sockType, protocol)
|
||||
except:
|
||||
freeAddrInfo(addrInfo)
|
||||
closeUnusedFds()
|
||||
@@ -1806,47 +1806,6 @@ proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
|
||||
else:
|
||||
break
|
||||
|
||||
proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
|
||||
## Reads a line of data from ``socket``. Returned future will complete once
|
||||
## a full line is read or an error occurs.
|
||||
##
|
||||
## If a full line is read ``\r\L`` is not
|
||||
## added to ``line``, however if solely ``\r\L`` is read then ``line``
|
||||
## will be set to it.
|
||||
##
|
||||
## If the socket is disconnected, ``line`` will be set to ``""``.
|
||||
##
|
||||
## If the socket is disconnected in the middle of a line (before ``\r\L``
|
||||
## is read) then line will be set to ``""``.
|
||||
## The partial line **will be lost**.
|
||||
##
|
||||
## **Warning**: This assumes that lines are delimited by ``\r\L``.
|
||||
##
|
||||
## **Note**: This procedure is mostly used for testing. You likely want to
|
||||
## use ``asyncnet.recvLine`` instead.
|
||||
##
|
||||
## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead.
|
||||
|
||||
template addNLIfEmpty(): typed =
|
||||
if result.len == 0:
|
||||
result.add("\c\L")
|
||||
|
||||
result = ""
|
||||
var c = ""
|
||||
while true:
|
||||
c = await recv(socket, 1)
|
||||
if c.len == 0:
|
||||
return ""
|
||||
if c == "\r":
|
||||
c = await recv(socket, 1)
|
||||
assert c == "\l"
|
||||
addNLIfEmpty()
|
||||
return
|
||||
elif c == "\L":
|
||||
addNLIfEmpty()
|
||||
return
|
||||
add(result, c)
|
||||
|
||||
proc callSoon*(cbproc: proc ()) =
|
||||
## Schedule `cbproc` to be called as soon as possible.
|
||||
## The callback is called when control returns to the event loop.
|
||||
|
||||
@@ -75,14 +75,54 @@
|
||||
## waitFor(main())
|
||||
|
||||
|
||||
import asyncdispatch, asyncnet, strutils, parseutils, os, times
|
||||
|
||||
from ftpclient import FtpBaseObj, ReplyError, FtpEvent
|
||||
import asyncdispatch, asyncnet, nativesockets, strutils, parseutils, os, times
|
||||
from net import BufferSize
|
||||
|
||||
type
|
||||
AsyncFtpClientObj* = FtpBaseObj[AsyncSocket]
|
||||
AsyncFtpClient* = ref AsyncFtpClientObj
|
||||
AsyncFtpClient* = ref object
|
||||
csock*: AsyncSocket
|
||||
dsock*: AsyncSocket
|
||||
user*, pass*: string
|
||||
address*: string
|
||||
port*: Port
|
||||
jobInProgress*: bool
|
||||
job*: FTPJob
|
||||
dsockConnected*: bool
|
||||
|
||||
FTPJobType* = enum
|
||||
JRetrText, JRetr, JStore
|
||||
|
||||
FtpJob = ref object
|
||||
prc: proc (ftp: AsyncFtpClient, async: bool): bool {.nimcall, gcsafe.}
|
||||
case typ*: FTPJobType
|
||||
of JRetrText:
|
||||
lines: string
|
||||
of JRetr, JStore:
|
||||
file: File
|
||||
filename: string
|
||||
total: BiggestInt # In bytes.
|
||||
progress: BiggestInt # In bytes.
|
||||
oneSecond: BiggestInt # Bytes transferred in one second.
|
||||
lastProgressReport: float # Time
|
||||
toStore: string # Data left to upload (Only used with async)
|
||||
|
||||
FTPEventType* = enum
|
||||
EvTransferProgress, EvLines, EvRetr, EvStore
|
||||
|
||||
FTPEvent* = object ## Event
|
||||
filename*: string
|
||||
case typ*: FTPEventType
|
||||
of EvLines:
|
||||
lines*: string ## Lines that have been transferred.
|
||||
of EvRetr, EvStore: ## Retr/Store operation finished.
|
||||
nil
|
||||
of EvTransferProgress:
|
||||
bytesTotal*: BiggestInt ## Bytes total.
|
||||
bytesFinished*: BiggestInt ## Bytes transferred.
|
||||
speed*: BiggestInt ## Speed in bytes/s
|
||||
currentJob*: FTPJobType ## The current job being performed.
|
||||
|
||||
ReplyError* = object of IOError
|
||||
|
||||
ProgressChangedProc* =
|
||||
proc (total, progress: BiggestInt, speed: float):
|
||||
@@ -183,7 +223,7 @@ proc listDirs*(ftp: AsyncFtpClient, dir = ""): Future[seq[string]] {.async.} =
|
||||
## Returns a list of filenames in the given directory. If ``dir`` is "",
|
||||
## the current directory is used. If ``async`` is true, this
|
||||
## function will return immediately and it will be your job to
|
||||
## use asyncio's ``poll`` to progress this operation.
|
||||
## use asyncdispatch's ``poll`` to progress this operation.
|
||||
await ftp.pasv()
|
||||
|
||||
assertReply(await(ftp.send("NLST " & dir.normalizePathSep)), ["125", "150"])
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
discard """
|
||||
output: "5000"
|
||||
"""
|
||||
import asyncdispatch, nativesockets, net, strutils, os
|
||||
import asyncdispatch, asyncnet, nativesockets, net, strutils, os
|
||||
|
||||
var msgCount = 0
|
||||
|
||||
@@ -12,20 +12,22 @@ const
|
||||
var clientCount = 0
|
||||
|
||||
proc sendMessages(client: AsyncFD) {.async.} =
|
||||
for i in 0 .. <messagesToSend:
|
||||
for i in 0 ..< messagesToSend:
|
||||
await send(client, "Message " & $i & "\c\L")
|
||||
|
||||
proc launchSwarm(port: Port) {.async.} =
|
||||
for i in 0 .. <swarmSize:
|
||||
var sock = newAsyncNativeSocket()
|
||||
for i in 0 ..< swarmSize:
|
||||
var sock = createAsyncNativeSocket()
|
||||
|
||||
await connect(sock, "localhost", port)
|
||||
await sendMessages(sock)
|
||||
closeSocket(sock)
|
||||
|
||||
proc readMessages(client: AsyncFD) {.async.} =
|
||||
# wrapping the AsyncFd into a AsyncSocket object
|
||||
var sockObj = newAsyncSocket(client)
|
||||
while true:
|
||||
var line = await recvLine(client)
|
||||
var line = await recvLine(sockObj)
|
||||
if line == "":
|
||||
closeSocket(client)
|
||||
clientCount.inc
|
||||
@@ -37,7 +39,7 @@ proc readMessages(client: AsyncFD) {.async.} =
|
||||
doAssert false
|
||||
|
||||
proc createServer(port: Port) {.async.} =
|
||||
var server = newAsyncNativeSocket()
|
||||
var server = createAsyncNativeSocket()
|
||||
block:
|
||||
var name: Sockaddr_in
|
||||
name.sin_family = toInt(AF_INET).uint16
|
||||
|
||||
@@ -9,7 +9,7 @@ type
|
||||
AsyncScgiState* = object of RootObj ## SCGI state object
|
||||
|
||||
#bug #442
|
||||
import sockets, asyncio, strtabs
|
||||
import asyncnet, strtabs
|
||||
proc handleSCGIRequest[TScgi: ScgiState | AsyncScgiState](s: TScgi) =
|
||||
discard
|
||||
proc handleSCGIRequest(client: AsyncSocket, headers: StringTableRef,
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
discard """
|
||||
disabled: true
|
||||
"""
|
||||
|
||||
# This is a regression of the new lambda lifting; detected by Aporia
|
||||
import asyncio, sockets
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import genpacket_enet, sockets, md5, enet
|
||||
import genpacket_enet, nativesockets, net, md5, enet
|
||||
defPacketImports()
|
||||
|
||||
type
|
||||
|
||||
@@ -5,7 +5,7 @@ discard """
|
||||
"""
|
||||
|
||||
import events
|
||||
import sockets
|
||||
import net
|
||||
import strutils
|
||||
import os
|
||||
|
||||
|
||||
@@ -178,8 +178,6 @@ lib/pure/json.nim
|
||||
lib/pure/base64.nim
|
||||
lib/impure/nre.nim
|
||||
lib/impure/nre/private/util.nim
|
||||
lib/deprecated/pure/sockets.nim
|
||||
lib/deprecated/pure/asyncio.nim
|
||||
lib/pure/collections/tables.nim
|
||||
lib/pure/collections/sets.nim
|
||||
lib/pure/collections/lists.nim
|
||||
|
||||
Reference in New Issue
Block a user