mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-30 09:54:49 +00:00
Merge branch 'bigbreak' of https://github.com/Araq/Nimrod into bigbreak
This commit is contained in:
@@ -13,7 +13,7 @@ import os, oids, tables, strutils, macros, times
|
||||
|
||||
import rawsockets, net
|
||||
|
||||
export Port, SocketFlags
|
||||
export Port, SocketFlag
|
||||
|
||||
#{.injectStmt: newGcInvariant().}
|
||||
|
||||
@@ -32,6 +32,7 @@ export Port, SocketFlags
|
||||
# TODO: The effect system (raises: []) has trouble with my try transformation.
|
||||
# TODO: Can't await in a 'except' body
|
||||
# TODO: getCurrentException(Msg) don't work
|
||||
# TODO: Check if yielded future is nil and throw a more meaningful exception
|
||||
|
||||
# -- Futures
|
||||
|
||||
@@ -187,7 +188,7 @@ proc asyncCheck*[T](future: Future[T]) =
|
||||
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
## Returns a future which will complete once both ``fut1`` and ``fut2``
|
||||
## complete.
|
||||
var retFuture = newFuture[void]()
|
||||
var retFuture = newFuture[void]("asyncdispatch.`and`")
|
||||
fut1.callback =
|
||||
proc () =
|
||||
if fut2.finished: retFuture.complete()
|
||||
@@ -199,11 +200,12 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
|
||||
## Returns a future which will complete once either ``fut1`` or ``fut2``
|
||||
## complete.
|
||||
var retFuture = newFuture[void]()
|
||||
var retFuture = newFuture[void]("asyncdispatch.`or`")
|
||||
proc cb() =
|
||||
if not retFuture.finished: retFuture.complete()
|
||||
fut1.callback = cb
|
||||
fut2.callback = cb
|
||||
return retFuture
|
||||
|
||||
type
|
||||
PDispatcherBase = ref object of RootRef
|
||||
@@ -221,11 +223,11 @@ proc processTimers(p: PDispatcherBase) =
|
||||
when defined(windows) or defined(nimdoc):
|
||||
import winlean, sets, hashes
|
||||
type
|
||||
TCompletionKey = dword
|
||||
TCompletionKey = Dword
|
||||
|
||||
TCompletionData* = object
|
||||
sock: TAsyncFD
|
||||
cb: proc (sock: TAsyncFD, bytesTransferred: DWORD,
|
||||
cb: proc (sock: TAsyncFD, bytesTransferred: Dword,
|
||||
errcode: OSErrorCode) {.closure,gcsafe.}
|
||||
|
||||
PDispatcher* = ref object of PDispatcherBase
|
||||
@@ -281,7 +283,7 @@ when defined(windows) or defined(nimdoc):
|
||||
let llTimeout =
|
||||
if timeout == -1: winlean.INFINITE
|
||||
else: timeout.int32
|
||||
var lpNumberOfBytesTransferred: DWORD
|
||||
var lpNumberOfBytesTransferred: Dword
|
||||
var lpCompletionKey: ULONG
|
||||
var customOverlapped: PCustomOverlapped
|
||||
let res = GetQueuedCompletionStatus(p.ioPort,
|
||||
@@ -319,10 +321,10 @@ when defined(windows) or defined(nimdoc):
|
||||
|
||||
proc initPointer(s: SocketHandle, func: var pointer, guid: var TGUID): bool =
|
||||
# Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
|
||||
var bytesRet: DWORD
|
||||
var bytesRet: Dword
|
||||
func = nil
|
||||
result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
|
||||
sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD,
|
||||
sizeof(TGUID).Dword, addr func, sizeof(pointer).Dword,
|
||||
addr bytesRet, nil, nil) == 0
|
||||
|
||||
proc initAll() =
|
||||
@@ -335,44 +337,44 @@ when defined(windows) or defined(nimdoc):
|
||||
raiseOSError(osLastError())
|
||||
|
||||
proc connectEx(s: SocketHandle, name: ptr TSockAddr, namelen: cint,
|
||||
lpSendBuffer: pointer, dwSendDataLength: dword,
|
||||
lpdwBytesSent: PDWORD, lpOverlapped: POVERLAPPED): bool =
|
||||
lpSendBuffer: pointer, dwSendDataLength: Dword,
|
||||
lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool =
|
||||
if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().")
|
||||
let func =
|
||||
cast[proc (s: SocketHandle, name: ptr TSockAddr, namelen: cint,
|
||||
lpSendBuffer: pointer, dwSendDataLength: dword,
|
||||
lpdwBytesSent: PDWORD, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr)
|
||||
lpSendBuffer: pointer, dwSendDataLength: Dword,
|
||||
lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr)
|
||||
|
||||
result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
|
||||
lpOverlapped)
|
||||
|
||||
proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
|
||||
dwReceiveDataLength, dwLocalAddressLength,
|
||||
dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD,
|
||||
dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
|
||||
lpOverlapped: POVERLAPPED): bool =
|
||||
if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().")
|
||||
let func =
|
||||
cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
|
||||
dwReceiveDataLength, dwLocalAddressLength,
|
||||
dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD,
|
||||
dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
|
||||
lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr)
|
||||
result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
|
||||
dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
|
||||
lpOverlapped)
|
||||
|
||||
proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
|
||||
dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD,
|
||||
LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint,
|
||||
RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) =
|
||||
dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword,
|
||||
LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: LPInt,
|
||||
RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: LPInt) =
|
||||
if getAcceptExSockAddrsPtr.isNil:
|
||||
raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().")
|
||||
|
||||
let func =
|
||||
cast[proc (lpOutputBuffer: pointer,
|
||||
dwReceiveDataLength, dwLocalAddressLength,
|
||||
dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr,
|
||||
LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr,
|
||||
RemoteSockaddrLength: lpint) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr)
|
||||
dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr TSockAddr,
|
||||
LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr TSockAddr,
|
||||
RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr)
|
||||
|
||||
func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
|
||||
dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
|
||||
@@ -405,7 +407,7 @@ when defined(windows) or defined(nimdoc):
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = TCompletionData(sock: socket, cb:
|
||||
proc (sock: TAsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
||||
proc (sock: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
retFuture.complete()
|
||||
@@ -460,12 +462,12 @@ when defined(windows) or defined(nimdoc):
|
||||
dataBuf.buf = cast[cstring](alloc0(size))
|
||||
dataBuf.len = size
|
||||
|
||||
var bytesReceived: DWORD
|
||||
var flagsio = flags.toOSFlags().DWORD
|
||||
var bytesReceived: Dword
|
||||
var flagsio = flags.toOSFlags().Dword
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = TCompletionData(sock: socket, cb:
|
||||
proc (sock: TAsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
||||
proc (sock: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
if bytesCount == 0 and dataBuf.buf[0] == '\0':
|
||||
@@ -539,11 +541,11 @@ when defined(windows) or defined(nimdoc):
|
||||
dataBuf.buf = data # since this is not used in a callback, this is fine
|
||||
dataBuf.len = data.len
|
||||
|
||||
var bytesReceived, lowFlags: DWORD
|
||||
var bytesReceived, lowFlags: Dword
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = TCompletionData(sock: socket, cb:
|
||||
proc (sock: TAsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
||||
proc (sock: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
retFuture.complete()
|
||||
@@ -592,10 +594,10 @@ when defined(windows) or defined(nimdoc):
|
||||
|
||||
const lpOutputLen = 1024
|
||||
var lpOutputBuf = newString(lpOutputLen)
|
||||
var dwBytesReceived: DWORD
|
||||
let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
|
||||
let dwLocalAddressLength = DWORD(sizeof (Tsockaddr_in) + 16)
|
||||
let dwRemoteAddressLength = DWORD(sizeof(Tsockaddr_in) + 16)
|
||||
var dwBytesReceived: Dword
|
||||
let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
|
||||
let dwLocalAddressLength = Dword(sizeof (Tsockaddr_in) + 16)
|
||||
let dwRemoteAddressLength = Dword(sizeof(Tsockaddr_in) + 16)
|
||||
|
||||
template completeAccept(): stmt {.immediate, dirty.} =
|
||||
var listenSock = socket
|
||||
@@ -604,12 +606,12 @@ when defined(windows) or defined(nimdoc):
|
||||
sizeof(listenSock).TSockLen)
|
||||
if setoptRet != 0: raiseOSError(osLastError())
|
||||
|
||||
var LocalSockaddr, RemoteSockaddr: ptr TSockAddr
|
||||
var localSockaddr, remoteSockaddr: ptr TSockAddr
|
||||
var localLen, remoteLen: int32
|
||||
getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
|
||||
dwLocalAddressLength, dwRemoteAddressLength,
|
||||
addr LocalSockaddr, addr localLen,
|
||||
addr RemoteSockaddr, addr remoteLen)
|
||||
addr localSockaddr, addr localLen,
|
||||
addr remoteSockaddr, addr remoteLen)
|
||||
register(clientSock.TAsyncFD)
|
||||
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
|
||||
retFuture.complete(
|
||||
@@ -632,7 +634,7 @@ when defined(windows) or defined(nimdoc):
|
||||
var ol = PCustomOverlapped()
|
||||
GC_ref(ol)
|
||||
ol.data = TCompletionData(sock: socket, cb:
|
||||
proc (sock: TAsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
|
||||
proc (sock: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
|
||||
if not retFuture.finished:
|
||||
if errcode == OSErrorCode(-1):
|
||||
completeAccept()
|
||||
@@ -1021,10 +1023,10 @@ proc processBody(node, retFutureSym: PNimrodNode,
|
||||
|
||||
result.add newNimNode(nnkReturnStmt, node).add(newNilLit())
|
||||
return # Don't process the children of this return stmt
|
||||
of nnkCommand:
|
||||
of nnkCommand, nnkCall:
|
||||
if node[0].kind == nnkIdent and node[0].ident == !"await":
|
||||
case node[1].kind
|
||||
of nnkIdent:
|
||||
of nnkIdent, nnkInfix:
|
||||
# await x
|
||||
result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x
|
||||
of nnkCall, nnkCommand:
|
||||
@@ -1034,8 +1036,8 @@ proc processBody(node, retFutureSym: PNimrodNode,
|
||||
futureValue, node)
|
||||
else:
|
||||
error("Invalid node kind in 'await', got: " & $node[1].kind)
|
||||
elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and
|
||||
node[1][0].ident == !"await":
|
||||
elif node.len > 1 and node[1].kind == nnkCommand and
|
||||
node[1][0].kind == nnkIdent and node[1][0].ident == !"await":
|
||||
# foo await x
|
||||
var newCommand = node
|
||||
result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1],
|
||||
@@ -1186,7 +1188,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
|
||||
result[6] = outerProcBody
|
||||
|
||||
#echo(treeRepr(result))
|
||||
#if prc[0].getName == "processClient":
|
||||
#if prc[0].getName == "getFile":
|
||||
# echo(toStrLit(result))
|
||||
|
||||
proc recvLine*(socket: TAsyncFD): Future[string] {.async.} =
|
||||
@@ -1228,3 +1230,11 @@ proc runForever*() =
|
||||
## Begins a never ending global dispatcher poll loop.
|
||||
while true:
|
||||
poll()
|
||||
|
||||
proc waitFor*[T](fut: PFuture[T]) =
|
||||
## **Blocks** the current thread until the specified future completes.
|
||||
while not fut.finished:
|
||||
poll()
|
||||
|
||||
if fut.failed:
|
||||
raise fut.error
|
||||
|
||||
295
lib/pure/asyncftpclient.nim
Normal file
295
lib/pure/asyncftpclient.nim
Normal file
@@ -0,0 +1,295 @@
|
||||
#
|
||||
#
|
||||
# Nimrod's Runtime Library
|
||||
# (c) Copyright 2014 Dominik Picheta
|
||||
# See the file "copying.txt", included in this
|
||||
# distribution, for details about the copyright.
|
||||
#
|
||||
|
||||
import asyncdispatch, asyncnet, strutils, parseutils, os, times
|
||||
|
||||
from ftpclient import FtpBaseObj, ReplyError, FtpEvent
|
||||
from net import BufferSize
|
||||
|
||||
type
|
||||
AsyncFtpClientObj* = FtpBaseObj[AsyncSocket]
|
||||
AsyncFtpClient* = ref AsyncFtpClientObj
|
||||
|
||||
ProgressChangedProc* =
|
||||
proc (total, progress: BiggestInt, speed: float):
|
||||
Future[void] {.closure, gcsafe.}
|
||||
|
||||
proc expectReply(ftp: AsyncFtpClient): Future[TaintedString] =
|
||||
result = ftp.csock.recvLine()
|
||||
|
||||
proc send*(ftp: AsyncFtpClient, m: string): Future[TaintedString] {.async.} =
|
||||
## Send a message to the server, and wait for a primary reply.
|
||||
## ``\c\L`` is added for you.
|
||||
await ftp.csock.send(m & "\c\L")
|
||||
return await ftp.expectReply()
|
||||
|
||||
proc assertReply(received: TaintedString, expected: varargs[string]) =
|
||||
for i in items(expected):
|
||||
if received.string.startsWith(i): return
|
||||
raise newException(ReplyError,
|
||||
"Expected reply '$1' got: $2" %
|
||||
[expected.join("' or '"), received.string])
|
||||
|
||||
proc pasv(ftp: AsyncFtpClient) {.async.} =
|
||||
## Negotiate a data connection.
|
||||
ftp.dsock = newAsyncSocket()
|
||||
|
||||
var pasvMsg = (await ftp.send("PASV")).string.strip.TaintedString
|
||||
assertReply(pasvMsg, "227")
|
||||
var betweenParens = captureBetween(pasvMsg.string, '(', ')')
|
||||
var nums = betweenParens.split(',')
|
||||
var ip = nums[0.. -3]
|
||||
var port = nums[-2.. -1]
|
||||
var properPort = port[0].parseInt()*256+port[1].parseInt()
|
||||
await ftp.dsock.connect(ip.join("."), Port(properPort.toU16))
|
||||
ftp.dsockConnected = true
|
||||
|
||||
proc normalizePathSep(path: string): string =
|
||||
return replace(path, '\\', '/')
|
||||
|
||||
proc connect*(ftp: AsyncFtpClient) {.async.} =
|
||||
## Connect to the FTP server specified by ``ftp``.
|
||||
await ftp.csock.connect(ftp.address, ftp.port)
|
||||
|
||||
var reply = await ftp.expectReply()
|
||||
if reply.startsWith("120"):
|
||||
# 120 Service ready in nnn minutes.
|
||||
# We wait until we receive 220.
|
||||
reply = await ftp.expectReply()
|
||||
assertReply(reply, "220")
|
||||
|
||||
if ftp.user != "":
|
||||
assertReply(await(ftp.send("USER " & ftp.user)), "230", "331")
|
||||
|
||||
if ftp.pass != "":
|
||||
assertReply(await(ftp.send("PASS " & ftp.pass)), "230")
|
||||
|
||||
proc pwd*(ftp: AsyncFtpClient): Future[TaintedString] {.async.} =
|
||||
## Returns the current working directory.
|
||||
let wd = await ftp.send("PWD")
|
||||
assertReply wd, "257"
|
||||
return wd.string.captureBetween('"').TaintedString # "
|
||||
|
||||
proc cd*(ftp: AsyncFtpClient, dir: string) {.async.} =
|
||||
## Changes the current directory on the remote FTP server to ``dir``.
|
||||
assertReply(await(ftp.send("CWD " & dir.normalizePathSep)), "250")
|
||||
|
||||
proc cdup*(ftp: AsyncFtpClient) {.async.} =
|
||||
## Changes the current directory to the parent of the current directory.
|
||||
assertReply(await(ftp.send("CDUP")), "200")
|
||||
|
||||
proc getLines(ftp: AsyncFtpClient): Future[string] {.async.} =
|
||||
## Downloads text data in ASCII mode
|
||||
result = ""
|
||||
assert ftp.dsockConnected
|
||||
while ftp.dsockConnected:
|
||||
let r = await ftp.dsock.recvLine()
|
||||
if r.string == "":
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
result.add(r.string & "\n")
|
||||
|
||||
assertReply(await(ftp.expectReply()), "226")
|
||||
|
||||
proc listDirs*(ftp: AsyncFtpClient, dir = ""): Future[seq[string]] {.async.} =
|
||||
## Returns a list of filenames in the given directory. If ``dir`` is "",
|
||||
## the current directory is used. If ``async`` is true, this
|
||||
## function will return immediately and it will be your job to
|
||||
## use asyncio's ``poll`` to progress this operation.
|
||||
await ftp.pasv()
|
||||
|
||||
assertReply(await(ftp.send("NLST " & dir.normalizePathSep)), ["125", "150"])
|
||||
|
||||
result = splitLines(await ftp.getLines())
|
||||
|
||||
proc existsFile*(ftp: AsyncFtpClient, file: string): Future[bool] {.async.} =
|
||||
## Determines whether ``file`` exists.
|
||||
var files = await ftp.listDirs()
|
||||
for f in items(files):
|
||||
if f.normalizePathSep == file.normalizePathSep: return true
|
||||
|
||||
proc createDir*(ftp: AsyncFtpClient, dir: string, recursive = false){.async.} =
|
||||
## Creates a directory ``dir``. If ``recursive`` is true, the topmost
|
||||
## subdirectory of ``dir`` will be created first, following the secondmost...
|
||||
## etc. this allows you to give a full path as the ``dir`` without worrying
|
||||
## about subdirectories not existing.
|
||||
if not recursive:
|
||||
assertReply(await(ftp.send("MKD " & dir.normalizePathSep)), "257")
|
||||
else:
|
||||
var reply = TaintedString""
|
||||
var previousDirs = ""
|
||||
for p in split(dir, {os.DirSep, os.AltSep}):
|
||||
if p != "":
|
||||
previousDirs.add(p)
|
||||
reply = await ftp.send("MKD " & previousDirs)
|
||||
previousDirs.add('/')
|
||||
assertReply reply, "257"
|
||||
|
||||
proc chmod*(ftp: AsyncFtpClient, path: string,
|
||||
permissions: set[TFilePermission]) {.async.} =
|
||||
## Changes permission of ``path`` to ``permissions``.
|
||||
var userOctal = 0
|
||||
var groupOctal = 0
|
||||
var otherOctal = 0
|
||||
for i in items(permissions):
|
||||
case i
|
||||
of fpUserExec: userOctal.inc(1)
|
||||
of fpUserWrite: userOctal.inc(2)
|
||||
of fpUserRead: userOctal.inc(4)
|
||||
of fpGroupExec: groupOctal.inc(1)
|
||||
of fpGroupWrite: groupOctal.inc(2)
|
||||
of fpGroupRead: groupOctal.inc(4)
|
||||
of fpOthersExec: otherOctal.inc(1)
|
||||
of fpOthersWrite: otherOctal.inc(2)
|
||||
of fpOthersRead: otherOctal.inc(4)
|
||||
|
||||
var perm = $userOctal & $groupOctal & $otherOctal
|
||||
assertReply(await(ftp.send("SITE CHMOD " & perm &
|
||||
" " & path.normalizePathSep)), "200")
|
||||
|
||||
proc list*(ftp: AsyncFtpClient, dir = ""): Future[string] {.async.} =
|
||||
## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current
|
||||
## working directory.
|
||||
await ftp.pasv()
|
||||
|
||||
let reply = await ftp.send("LIST" & " " & dir.normalizePathSep)
|
||||
assertReply(reply, ["125", "150"])
|
||||
|
||||
result = await ftp.getLines()
|
||||
|
||||
proc retrText*(ftp: AsyncFtpClient, file: string): Future[string] {.async.} =
|
||||
## Retrieves ``file``. File must be ASCII text.
|
||||
await ftp.pasv()
|
||||
let reply = await ftp.send("RETR " & file.normalizePathSep)
|
||||
assertReply(reply, ["125", "150"])
|
||||
|
||||
result = await ftp.getLines()
|
||||
|
||||
proc getFile(ftp: AsyncFtpClient, file: TFile, total: BiggestInt,
|
||||
onProgressChanged: ProgressChangedProc) {.async.} =
|
||||
assert ftp.dsockConnected
|
||||
var progress = 0
|
||||
var progressInSecond = 0
|
||||
var countdownFut = sleepAsync(1000)
|
||||
var dataFut = ftp.dsock.recv(BufferSize)
|
||||
while ftp.dsockConnected:
|
||||
await dataFut or countdownFut
|
||||
if countdownFut.finished:
|
||||
asyncCheck onProgressChanged(total, progress,
|
||||
progressInSecond.float)
|
||||
progressInSecond = 0
|
||||
countdownFut = sleepAsync(1000)
|
||||
|
||||
if dataFut.finished:
|
||||
let data = dataFut.read
|
||||
if data != "":
|
||||
progress.inc(data.len)
|
||||
progressInSecond.inc(data.len)
|
||||
file.write(data)
|
||||
dataFut = ftp.dsock.recv(BufferSize)
|
||||
else:
|
||||
ftp.dsockConnected = false
|
||||
|
||||
assertReply(await(ftp.expectReply()), "226")
|
||||
|
||||
proc defaultOnProgressChanged*(total, progress: BiggestInt,
|
||||
speed: float): Future[void] {.nimcall,gcsafe.} =
|
||||
## Default FTP ``onProgressChanged`` handler. Does nothing.
|
||||
result = newFuture[void]()
|
||||
#echo(total, " ", progress, " ", speed)
|
||||
result.complete()
|
||||
|
||||
proc retrFile*(ftp: AsyncFtpClient, file, dest: string,
|
||||
onProgressChanged = defaultOnProgressChanged) {.async.} =
|
||||
## Downloads ``file`` and saves it to ``dest``.
|
||||
## The ``EvRetr`` event is passed to the specified ``handleEvent`` function
|
||||
## when the download is finished. The event's ``filename`` field will be equal
|
||||
## to ``file``.
|
||||
var destFile = open(dest, mode = fmWrite)
|
||||
await ftp.pasv()
|
||||
var reply = await ftp.send("RETR " & file.normalizePathSep)
|
||||
assertReply reply, ["125", "150"]
|
||||
if {'(', ')'} notin reply.string:
|
||||
raise newException(ReplyError, "Reply has no file size.")
|
||||
var fileSize: BiggestInt
|
||||
if reply.string.captureBetween('(', ')').parseBiggestInt(fileSize) == 0:
|
||||
raise newException(ReplyError, "Reply has no file size.")
|
||||
|
||||
await getFile(ftp, destFile, fileSize, onProgressChanged)
|
||||
|
||||
proc doUpload(ftp: AsyncFtpClient, file: TFile,
|
||||
onProgressChanged: ProgressChangedProc) {.async.} =
|
||||
assert ftp.dsockConnected
|
||||
|
||||
let total = file.getFileSize()
|
||||
var data = newStringOfCap(4000)
|
||||
var progress = 0
|
||||
var progressInSecond = 0
|
||||
var countdownFut = sleepAsync(1000)
|
||||
var sendFut: Future[void] = nil
|
||||
while ftp.dsockConnected:
|
||||
if sendFut == nil or sendFut.finished:
|
||||
progress.inc(data.len)
|
||||
progressInSecond.inc(data.len)
|
||||
# TODO: Async file reading.
|
||||
let len = file.readBuffer(addr(data[0]), 4000)
|
||||
setLen(data, len)
|
||||
if len == 0:
|
||||
# File finished uploading.
|
||||
ftp.dsock.close()
|
||||
ftp.dsockConnected = false
|
||||
|
||||
assertReply(await(ftp.expectReply()), "226")
|
||||
else:
|
||||
sendFut = ftp.dsock.send(data)
|
||||
|
||||
if countdownFut.finished:
|
||||
asyncCheck onProgressChanged(total, progress, progressInSecond.float)
|
||||
progressInSecond = 0
|
||||
countdownFut = sleepAsync(1000)
|
||||
|
||||
await countdownFut or sendFut
|
||||
|
||||
proc storeFile*(ftp: AsyncFtpClient, file, dest: string,
|
||||
onProgressChanged = defaultOnProgressChanged) {.async.} =
|
||||
## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this
|
||||
## function asynchronously is recommended to view the progress of
|
||||
## the download.
|
||||
## The ``EvStore`` event is passed to the specified ``handleEvent`` function
|
||||
## when the upload is finished, and the ``filename`` field will be
|
||||
## equal to ``file``.
|
||||
var destFile = open(file)
|
||||
await ftp.pasv()
|
||||
|
||||
let reply = await ftp.send("STOR " & dest.normalizePathSep)
|
||||
assertReply reply, ["125", "150"]
|
||||
|
||||
await doUpload(ftp, destFile, onProgressChanged)
|
||||
|
||||
proc newAsyncFtpClient*(address: string, port = Port(21),
|
||||
user, pass = ""): AsyncFtpClient =
|
||||
## Creates a new ``AsyncFtpClient`` object.
|
||||
new result
|
||||
result.user = user
|
||||
result.pass = pass
|
||||
result.address = address
|
||||
result.port = port
|
||||
result.dsockConnected = false
|
||||
result.csock = newAsyncSocket()
|
||||
|
||||
when isMainModule:
|
||||
var ftp = newAsyncFtpClient("example.com", user = "test", pass = "test")
|
||||
proc main(ftp: AsyncFtpClient) {.async.} =
|
||||
await ftp.connect()
|
||||
echo await ftp.pwd()
|
||||
echo await ftp.listDirs()
|
||||
await ftp.storeFile("payload.jpg", "payload.jpg")
|
||||
await ftp.retrFile("payload.jpg", "payload2.jpg")
|
||||
echo("Finished")
|
||||
|
||||
waitFor main(ftp)
|
||||
@@ -100,7 +100,8 @@ proc sendStatus(client: PAsyncSocket, status: string): PFuture[void] =
|
||||
client.send("HTTP/1.1 " & status & "\c\L")
|
||||
|
||||
proc processClient(client: PAsyncSocket, address: string,
|
||||
callback: proc (request: TRequest): PFuture[void]) {.async.} =
|
||||
callback: proc (request: TRequest):
|
||||
PFuture[void] {.closure, gcsafe.}) {.async.} =
|
||||
while true:
|
||||
# GET /path HTTP/1.1
|
||||
# Header: val
|
||||
@@ -187,7 +188,7 @@ proc processClient(client: PAsyncSocket, address: string,
|
||||
break
|
||||
|
||||
proc serve*(server: PAsyncHttpServer, port: TPort,
|
||||
callback: proc (request: TRequest): PFuture[void] {.gcsafe.},
|
||||
callback: proc (request: TRequest): PFuture[void] {.closure,gcsafe.},
|
||||
address = "") {.async.} =
|
||||
## Starts the process of listening for incoming HTTP connections on the
|
||||
## specified address and port.
|
||||
|
||||
@@ -91,9 +91,11 @@ import sockets, os
|
||||
## getSocket(s).accept(client)
|
||||
|
||||
when defined(windows):
|
||||
from winlean import TimeVal, SocketHandle, FdSet, FD_ZERO, FD_SET, FD_ISSET, select
|
||||
from winlean import TimeVal, SocketHandle, FdSet, FD_ZERO, FD_SET,
|
||||
fdSet, FD_ISSET, select
|
||||
else:
|
||||
from posix import TimeVal, SocketHandle, FdSet, FD_ZERO, FD_SET, FD_ISSET, select
|
||||
from posix import TimeVal, SocketHandle, FdSet, FD_ZERO, FD_SET,
|
||||
fdSet, FD_ISSET, select
|
||||
|
||||
type
|
||||
DelegateObj* = object
|
||||
@@ -113,33 +115,32 @@ type
|
||||
|
||||
Dispatcher* = ref DispatcherObj
|
||||
DispatcherObj = object
|
||||
delegates: seq[PDelegate]
|
||||
delegates: seq[Delegate]
|
||||
|
||||
AsyncSocket* = ref AsyncSocketObj
|
||||
AsyncSocketObj* = object of RootObj
|
||||
socket: Socket
|
||||
info: SocketStatus
|
||||
|
||||
handleRead*: proc (s: PAsyncSocket) {.closure, gcsafe.}
|
||||
handleWrite: proc (s: PAsyncSocket) {.closure, gcsafe.}
|
||||
handleConnect*: proc (s: PAsyncSocket) {.closure, gcsafe.}
|
||||
handleRead*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
handleWrite: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
handleConnect*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
|
||||
handleAccept*: proc (s: PAsyncSocket) {.closure, gcsafe.}
|
||||
handleAccept*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
|
||||
handleTask*: proc (s: PAsyncSocket) {.closure, gcsafe.}
|
||||
handleTask*: proc (s: AsyncSocket) {.closure, gcsafe.}
|
||||
|
||||
lineBuffer: TaintedString ## Temporary storage for ``readLine``
|
||||
sendBuffer: string ## Temporary storage for ``send``
|
||||
sslNeedAccept: bool
|
||||
proto: TProtocol
|
||||
deleg: PDelegate
|
||||
proto: Protocol
|
||||
deleg: Delegate
|
||||
|
||||
SocketStatus* = enum
|
||||
SockIdle, SockConnecting, SockConnected, SockListening, SockClosed,
|
||||
SockUDPBound
|
||||
|
||||
{.deprecated: [TDelegate: DelegateObj, PDelegate: Delegate,
|
||||
TProcessOption: ProcessOption,
|
||||
TInfo: SocketStatus, PAsyncSocket: AsyncSocket, TAsyncSocket: AsyncSocketObj,
|
||||
TDispatcher: DispatcherObj, PDispatcher: Dispatcher,
|
||||
].}
|
||||
@@ -176,7 +177,7 @@ proc asyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
|
||||
result = newAsyncSocket()
|
||||
result.socket = socket(domain, typ, protocol, buffered)
|
||||
result.proto = protocol
|
||||
if result.socket == invalidSocket: osError(osLastError())
|
||||
if result.socket == invalidSocket: raiseOSError(osLastError())
|
||||
result.socket.setBlocking(false)
|
||||
|
||||
proc toAsyncSocket*(sock: TSocket, state: TInfo = SockConnected): PAsyncSocket =
|
||||
@@ -366,7 +367,7 @@ proc acceptAddr*(server: PAsyncSocket, client: var PAsyncSocket,
|
||||
client.sslNeedAccept = false
|
||||
client.info = SockConnected
|
||||
|
||||
if c == invalidSocket: socketError(server.socket)
|
||||
if c == invalidSocket: raiseSocketError(server.socket)
|
||||
c.setBlocking(false) # TODO: Needs to be tested.
|
||||
|
||||
# deleg.open is set in ``toDelegate``.
|
||||
@@ -490,7 +491,7 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool {.deprecated.} =
|
||||
of RecvDisconnected:
|
||||
result = true
|
||||
of RecvFail:
|
||||
s.socketError(async = true)
|
||||
s.raiseSocketError(async = true)
|
||||
result = false
|
||||
{.pop.}
|
||||
|
||||
@@ -544,19 +545,19 @@ proc send*(sock: PAsyncSocket, data: string) =
|
||||
sock.sendBuffer.add(data[bytesSent .. -1])
|
||||
sock.deleg.mode = fmReadWrite
|
||||
|
||||
proc timeValFromMilliseconds(timeout = 500): TTimeVal =
|
||||
proc timeValFromMilliseconds(timeout = 500): TimeVal =
|
||||
if timeout != -1:
|
||||
var seconds = timeout div 1000
|
||||
result.tv_sec = seconds.int32
|
||||
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
|
||||
|
||||
proc createFdSet(fd: var TFdSet, s: seq[PDelegate], m: var int) =
|
||||
proc createFdSet(fd: var FdSet, s: seq[Delegate], m: var int) =
|
||||
FD_ZERO(fd)
|
||||
for i in items(s):
|
||||
m = max(m, int(i.fd))
|
||||
FD_SET(i.fd, fd)
|
||||
fdSet(i.fd, fd)
|
||||
|
||||
proc pruneSocketSet(s: var seq[PDelegate], fd: var TFdSet) =
|
||||
proc pruneSocketSet(s: var seq[Delegate], fd: var FdSet) =
|
||||
var i = 0
|
||||
var L = s.len
|
||||
while i < L:
|
||||
@@ -569,9 +570,9 @@ proc pruneSocketSet(s: var seq[PDelegate], fd: var TFdSet) =
|
||||
|
||||
proc select(readfds, writefds, exceptfds: var seq[PDelegate],
|
||||
timeout = 500): int =
|
||||
var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout)
|
||||
var tv {.noInit.}: TimeVal = timeValFromMilliseconds(timeout)
|
||||
|
||||
var rd, wr, ex: TFdSet
|
||||
var rd, wr, ex: FdSet
|
||||
var m = 0
|
||||
createFdSet(rd, readfds, m)
|
||||
createFdSet(wr, writefds, m)
|
||||
@@ -681,7 +682,7 @@ when isMainModule:
|
||||
proc main =
|
||||
var d = newDispatcher()
|
||||
|
||||
var s = AsyncSocket()
|
||||
var s = asyncSocket()
|
||||
s.connect("amber.tenthbit.net", TPort(6667))
|
||||
s.handleConnect =
|
||||
proc (s: PAsyncSocket) =
|
||||
@@ -691,7 +692,7 @@ when isMainModule:
|
||||
testRead(s, 1)
|
||||
d.register(s)
|
||||
|
||||
var server = AsyncSocket()
|
||||
var server = asyncSocket()
|
||||
server.handleAccept =
|
||||
proc (s: PAsyncSocket) =
|
||||
testAccept(s, d, 78)
|
||||
|
||||
@@ -64,7 +64,7 @@ type
|
||||
proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket =
|
||||
assert fd != osInvalidSocket.TAsyncFD
|
||||
new(result.PSocket)
|
||||
result.fd = fd.TSocketHandle
|
||||
result.fd = fd.SocketHandle
|
||||
result.isBuffered = isBuff
|
||||
if isBuff:
|
||||
result.currPos = 0
|
||||
@@ -75,15 +75,15 @@ proc newAsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
|
||||
result = newSocket(newAsyncRawSocket(domain, typ, protocol), buffered)
|
||||
|
||||
proc connect*(socket: PAsyncSocket, address: string, port: TPort,
|
||||
af = AF_INET): PFuture[void] =
|
||||
af = AF_INET): Future[void] =
|
||||
## Connects ``socket`` to server at ``address:port``.
|
||||
##
|
||||
## Returns a ``PFuture`` which will complete when the connection succeeds
|
||||
## Returns a ``Future`` which will complete when the connection succeeds
|
||||
## or an error occurs.
|
||||
result = connect(socket.fd.TAsyncFD, address, port, af)
|
||||
|
||||
proc readIntoBuf(socket: PAsyncSocket,
|
||||
flags: set[TSocketFlags]): PFuture[int] {.async.} =
|
||||
flags: set[TSocketFlags]): Future[int] {.async.} =
|
||||
var data = await recv(socket.fd.TAsyncFD, BufferSize, flags)
|
||||
if data.len != 0:
|
||||
copyMem(addr socket.buffer[0], addr data[0], data.len)
|
||||
@@ -92,7 +92,7 @@ proc readIntoBuf(socket: PAsyncSocket,
|
||||
result = data.len
|
||||
|
||||
proc recv*(socket: PAsyncSocket, size: int,
|
||||
flags = {TSocketFlags.SafeDisconn}): PFuture[string] {.async.} =
|
||||
flags = {TSocketFlags.SafeDisconn}): Future[string] {.async.} =
|
||||
## Reads ``size`` bytes from ``socket``. Returned future will complete once
|
||||
## all of the requested data is read. If socket is disconnected during the
|
||||
## recv operation then the future may complete with only a part of the
|
||||
@@ -131,21 +131,21 @@ proc recv*(socket: PAsyncSocket, size: int,
|
||||
result = await recv(socket.fd.TAsyncFD, size, flags)
|
||||
|
||||
proc send*(socket: PAsyncSocket, data: string,
|
||||
flags = {TSocketFlags.SafeDisconn}): PFuture[void] =
|
||||
flags = {TSocketFlags.SafeDisconn}): Future[void] =
|
||||
## Sends ``data`` to ``socket``. The returned future will complete once all
|
||||
## data has been sent.
|
||||
assert socket != nil
|
||||
result = send(socket.fd.TAsyncFD, data, flags)
|
||||
|
||||
proc acceptAddr*(socket: PAsyncSocket, flags = {TSocketFlags.SafeDisconn}):
|
||||
PFuture[tuple[address: string, client: PAsyncSocket]] =
|
||||
Future[tuple[address: string, client: PAsyncSocket]] =
|
||||
## Accepts a new connection. Returns a future containing the client socket
|
||||
## corresponding to that connection and the remote address of the client.
|
||||
## The future will complete when the connection is successfully accepted.
|
||||
var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]("asyncnet.acceptAddr")
|
||||
var fut = acceptAddr(socket.fd.TAsyncFD, flags)
|
||||
fut.callback =
|
||||
proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
|
||||
proc (future: Future[tuple[address: string, client: TAsyncFD]]) =
|
||||
assert future.finished
|
||||
if future.failed:
|
||||
retFuture.fail(future.readError)
|
||||
@@ -156,14 +156,14 @@ proc acceptAddr*(socket: PAsyncSocket, flags = {TSocketFlags.SafeDisconn}):
|
||||
return retFuture
|
||||
|
||||
proc accept*(socket: PAsyncSocket,
|
||||
flags = {TSocketFlags.SafeDisconn}): PFuture[PAsyncSocket] =
|
||||
flags = {TSocketFlags.SafeDisconn}): Future[PAsyncSocket] =
|
||||
## Accepts a new connection. Returns a future containing the client socket
|
||||
## corresponding to that connection.
|
||||
## The future will complete when the connection is successfully accepted.
|
||||
var retFut = newFuture[PAsyncSocket]("asyncnet.accept")
|
||||
var fut = acceptAddr(socket, flags)
|
||||
fut.callback =
|
||||
proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) =
|
||||
proc (future: Future[tuple[address: string, client: PAsyncSocket]]) =
|
||||
assert future.finished
|
||||
if future.failed:
|
||||
retFut.fail(future.readError)
|
||||
@@ -172,7 +172,7 @@ proc accept*(socket: PAsyncSocket,
|
||||
return retFut
|
||||
|
||||
proc recvLine*(socket: PAsyncSocket,
|
||||
flags = {TSocketFlags.SafeDisconn}): PFuture[string] {.async.} =
|
||||
flags = {TSocketFlags.SafeDisconn}): Future[string] {.async.} =
|
||||
## Reads a line of data from ``socket``. Returned future will complete once
|
||||
## a full line is read or an error occurs.
|
||||
##
|
||||
@@ -282,23 +282,23 @@ when isMainModule:
|
||||
var sock = newAsyncSocket()
|
||||
var f = connect(sock, "irc.freenode.net", TPort(6667))
|
||||
f.callback =
|
||||
proc (future: PFuture[void]) =
|
||||
proc (future: Future[void]) =
|
||||
echo("Connected in future!")
|
||||
for i in 0 .. 50:
|
||||
var recvF = recv(sock, 10)
|
||||
recvF.callback =
|
||||
proc (future: PFuture[string]) =
|
||||
proc (future: Future[string]) =
|
||||
echo("Read ", future.read.len, ": ", future.read.repr)
|
||||
elif test == LowServer:
|
||||
var sock = newAsyncSocket()
|
||||
sock.bindAddr(TPort(6667))
|
||||
sock.listen()
|
||||
proc onAccept(future: PFuture[PAsyncSocket]) =
|
||||
proc onAccept(future: Future[PAsyncSocket]) =
|
||||
let client = future.read
|
||||
echo "Accepted ", client.fd.cint
|
||||
var t = send(client, "test\c\L")
|
||||
t.callback =
|
||||
proc (future: PFuture[void]) =
|
||||
proc (future: Future[void]) =
|
||||
echo("Send")
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -10,6 +10,10 @@ include "system/inclrtl"
|
||||
|
||||
import sockets, strutils, parseutils, times, os, asyncio
|
||||
|
||||
from asyncnet import nil
|
||||
from rawsockets import nil
|
||||
from asyncdispatch import PFuture
|
||||
|
||||
## This module **partially** implements an FTP client as specified
|
||||
## by `RFC 959 <http://tools.ietf.org/html/rfc959>`_.
|
||||
##
|
||||
@@ -33,34 +37,32 @@ import sockets, strutils, parseutils, times, os, asyncio
|
||||
## to change.
|
||||
|
||||
type
|
||||
FTPClientObj* = object of RootObj
|
||||
case isAsync: bool
|
||||
of false:
|
||||
csock: Socket # Command connection socket
|
||||
dsock: Socket # Data connection socket
|
||||
else:
|
||||
dummyA, dummyB: pointer # workaround a Nim API issue
|
||||
asyncCSock: AsyncSocket
|
||||
asyncDSock: AsyncSocket
|
||||
FtpBase*[SockType] = ref FtpBaseObj[SockType]
|
||||
FtpBaseObj*[SockType] = object
|
||||
csock*: SockType
|
||||
dsock*: SockType
|
||||
when SockType is asyncio.AsyncSocket:
|
||||
handleEvent*: proc (ftp: AsyncFTPClient, ev: FTPEvent){.closure,gcsafe.}
|
||||
disp: Dispatcher
|
||||
asyncDSockID: Delegate
|
||||
user, pass: string
|
||||
address: string
|
||||
port: Port
|
||||
user*, pass*: string
|
||||
address*: string
|
||||
when SockType is asyncnet.AsyncSocket:
|
||||
port*: rawsockets.Port
|
||||
else:
|
||||
port*: Port
|
||||
|
||||
jobInProgress: bool
|
||||
job: ref FTPJob
|
||||
jobInProgress*: bool
|
||||
job*: FTPJob[SockType]
|
||||
|
||||
dsockConnected: bool
|
||||
|
||||
FTPClient* = ref FTPClientObj
|
||||
dsockConnected*: bool
|
||||
|
||||
FTPJobType* = enum
|
||||
JRetrText, JRetr, JStore
|
||||
|
||||
FTPJob = object
|
||||
prc: proc (ftp: PFTPClient, async: bool): bool {.nimcall, gcsafe.}
|
||||
FtpJob[T] = ref FtpJobObj[T]
|
||||
FTPJobObj[T] = object
|
||||
prc: proc (ftp: FTPBase[T], async: bool): bool {.nimcall, gcsafe.}
|
||||
case typ*: FTPJobType
|
||||
of JRetrText:
|
||||
lines: string
|
||||
@@ -74,8 +76,11 @@ type
|
||||
toStore: string # Data left to upload (Only used with async)
|
||||
else: nil
|
||||
|
||||
AsyncFTPClient* = ref AsyncFTPClientObj ## Async alternative to TFTPClient.
|
||||
AsyncFTPClientObj* = object of FTPClientObj
|
||||
FtpClientObj* = FtpBaseObj[Socket]
|
||||
FtpClient* = ref FtpClientObj
|
||||
|
||||
AsyncFtpClient* = ref AsyncFtpClientObj ## Async alternative to TFTPClient.
|
||||
AsyncFtpClientObj* = FtpBaseObj[asyncio.AsyncSocket]
|
||||
|
||||
FTPEventType* = enum
|
||||
EvTransferProgress, EvLines, EvRetr, EvStore
|
||||
@@ -103,42 +108,39 @@ type
|
||||
].}
|
||||
|
||||
proc ftpClient*(address: string, port = TPort(21),
|
||||
user, pass = ""): PFTPClient =
|
||||
## Create a ``PFTPClient`` object.
|
||||
user, pass = ""): FtpClient =
|
||||
## Create a ``FtpClient`` object.
|
||||
new(result)
|
||||
result.user = user
|
||||
result.pass = pass
|
||||
result.address = address
|
||||
result.port = port
|
||||
|
||||
result.isAsync = false
|
||||
result.dsockConnected = false
|
||||
result.csock = socket()
|
||||
if result.csock == InvalidSocket: osError(osLastError())
|
||||
|
||||
proc getDSock(ftp: PFTPClient): TSocket =
|
||||
if ftp.isAsync: return ftp.asyncDSock else: return ftp.dsock
|
||||
|
||||
proc getCSock(ftp: PFTPClient): TSocket =
|
||||
if ftp.isAsync: return ftp.asyncCSock else: return ftp.csock
|
||||
if result.csock == invalidSocket: raiseOSError(osLastError())
|
||||
|
||||
template blockingOperation(sock: TSocket, body: stmt) {.immediate.} =
|
||||
if ftp.isAsync:
|
||||
sock.setBlocking(true)
|
||||
body
|
||||
if ftp.isAsync:
|
||||
sock.setBlocking(false)
|
||||
|
||||
proc expectReply(ftp: PFTPClient): TaintedString =
|
||||
template blockingOperation(sock: asyncio.PAsyncSocket, body: stmt) {.immediate.} =
|
||||
sock.setBlocking(true)
|
||||
body
|
||||
sock.setBlocking(false)
|
||||
|
||||
proc expectReply[T](ftp: FtpBase[T]): TaintedString =
|
||||
result = TaintedString""
|
||||
blockingOperation(ftp.getCSock()):
|
||||
ftp.getCSock().readLine(result)
|
||||
blockingOperation(ftp.csock):
|
||||
when T is Socket:
|
||||
ftp.csock.readLine(result)
|
||||
else:
|
||||
discard ftp.csock.readLine(result)
|
||||
|
||||
proc send*(ftp: PFTPClient, m: string): TaintedString =
|
||||
proc send*[T](ftp: FtpBase[T], m: string): TaintedString =
|
||||
## Send a message to the server, and wait for a primary reply.
|
||||
## ``\c\L`` is added for you.
|
||||
blockingOperation(ftp.getCSock()):
|
||||
ftp.getCSock().send(m & "\c\L")
|
||||
blockingOperation(ftp.csock):
|
||||
ftp.csock.send(m & "\c\L")
|
||||
return ftp.expectReply()
|
||||
|
||||
proc assertReply(received: TaintedString, expected: string) =
|
||||
@@ -154,8 +156,8 @@ proc assertReply(received: TaintedString, expected: varargs[string]) =
|
||||
"Expected reply '$1' got: $2" %
|
||||
[expected.join("' or '"), received.string])
|
||||
|
||||
proc createJob(ftp: PFTPClient,
|
||||
prc: proc (ftp: PFTPClient, async: bool): bool {.
|
||||
proc createJob[T](ftp: FtpBase[T],
|
||||
prc: proc (ftp: FtpBase[T], async: bool): bool {.
|
||||
nimcall,gcsafe.},
|
||||
cmd: FTPJobType) =
|
||||
if ftp.jobInProgress:
|
||||
@@ -170,7 +172,7 @@ proc createJob(ftp: PFTPClient,
|
||||
of JRetr, JStore:
|
||||
ftp.job.toStore = ""
|
||||
|
||||
proc deleteJob(ftp: PFTPClient) =
|
||||
proc deleteJob[T](ftp: FtpBase[T]) =
|
||||
assert ftp.jobInProgress
|
||||
ftp.jobInProgress = false
|
||||
case ftp.job.typ
|
||||
@@ -178,12 +180,9 @@ proc deleteJob(ftp: PFTPClient) =
|
||||
ftp.job.lines = ""
|
||||
of JRetr, JStore:
|
||||
ftp.job.file.close()
|
||||
if ftp.isAsync:
|
||||
ftp.asyncDSock.close()
|
||||
else:
|
||||
ftp.dsock.close()
|
||||
ftp.dsock.close()
|
||||
|
||||
proc handleTask(s: PAsyncSocket, ftp: PFTPClient) =
|
||||
proc handleTask(s: PAsyncSocket, ftp: PAsyncFTPClient) =
|
||||
if ftp.jobInProgress:
|
||||
if ftp.job.typ in {JRetr, JStore}:
|
||||
if epochTime() - ftp.job.lastProgressReport >= 1.0:
|
||||
@@ -198,12 +197,12 @@ proc handleTask(s: PAsyncSocket, ftp: PFTPClient) =
|
||||
ftp.job.oneSecond = 0
|
||||
ftp.handleEvent(PAsyncFTPClient(ftp), r)
|
||||
|
||||
proc handleWrite(s: PAsyncSocket, ftp: PFTPClient) =
|
||||
proc handleWrite(s: PAsyncSocket, ftp: PAsyncFTPClient) =
|
||||
if ftp.jobInProgress:
|
||||
if ftp.job.typ == JStore:
|
||||
assert (not ftp.job.prc(ftp, true))
|
||||
|
||||
proc handleConnect(s: PAsyncSocket, ftp: PFTPClient) =
|
||||
proc handleConnect(s: PAsyncSocket, ftp: PAsyncFTPClient) =
|
||||
ftp.dsockConnected = true
|
||||
assert(ftp.jobInProgress)
|
||||
if ftp.job.typ == JStore:
|
||||
@@ -211,30 +210,32 @@ proc handleConnect(s: PAsyncSocket, ftp: PFTPClient) =
|
||||
else:
|
||||
s.delHandleWrite()
|
||||
|
||||
proc handleRead(s: PAsyncSocket, ftp: PFTPClient) =
|
||||
proc handleRead(s: PAsyncSocket, ftp: PAsyncFTPClient) =
|
||||
assert ftp.jobInProgress
|
||||
assert ftp.job.typ != JStore
|
||||
# This can never return true, because it shouldn't check for code
|
||||
# 226 from csock.
|
||||
assert(not ftp.job.prc(ftp, true))
|
||||
|
||||
proc pasv(ftp: PFTPClient) =
|
||||
proc pasv[T](ftp: FtpBase[T]) =
|
||||
## Negotiate a data connection.
|
||||
if not ftp.isAsync:
|
||||
when T is TSocket:
|
||||
ftp.dsock = socket()
|
||||
if ftp.dsock == InvalidSocket: osError(osLastError())
|
||||
else:
|
||||
ftp.asyncDSock = AsyncSocket()
|
||||
ftp.asyncDSock.handleRead =
|
||||
if ftp.dsock == invalidSocket: raiseOSError(osLastError())
|
||||
elif T is PAsyncSocket:
|
||||
ftp.dsock = asyncSocket()
|
||||
ftp.dsock.handleRead =
|
||||
proc (s: PAsyncSocket) =
|
||||
handleRead(s, ftp)
|
||||
ftp.asyncDSock.handleConnect =
|
||||
ftp.dsock.handleConnect =
|
||||
proc (s: PAsyncSocket) =
|
||||
handleConnect(s, ftp)
|
||||
ftp.asyncDSock.handleTask =
|
||||
ftp.dsock.handleTask =
|
||||
proc (s: PAsyncSocket) =
|
||||
handleTask(s, ftp)
|
||||
ftp.disp.register(ftp.asyncDSock)
|
||||
ftp.disp.register(ftp.dsock)
|
||||
else:
|
||||
{.fatal: "Incorrect socket instantiation".}
|
||||
|
||||
var pasvMsg = ftp.send("PASV").string.strip.TaintedString
|
||||
assertReply(pasvMsg, "227")
|
||||
@@ -243,23 +244,24 @@ proc pasv(ftp: PFTPClient) =
|
||||
var ip = nums[0.. -3]
|
||||
var port = nums[-2.. -1]
|
||||
var properPort = port[0].parseInt()*256+port[1].parseInt()
|
||||
if ftp.isAsync:
|
||||
ftp.asyncDSock.connect(ip.join("."), TPort(properPort.toU16))
|
||||
ftp.dsockConnected = False
|
||||
ftp.dsock.connect(ip.join("."), TPort(properPort.toU16))
|
||||
when T is PAsyncSocket:
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
ftp.dsock.connect(ip.join("."), TPort(properPort.toU16))
|
||||
ftp.dsockConnected = True
|
||||
ftp.dsockConnected = true
|
||||
|
||||
proc normalizePathSep(path: string): string =
|
||||
return replace(path, '\\', '/')
|
||||
|
||||
proc connect*(ftp: PFTPClient) =
|
||||
proc connect*[T](ftp: FtpBase[T]) =
|
||||
## Connect to the FTP server specified by ``ftp``.
|
||||
if ftp.isAsync:
|
||||
blockingOperation(ftp.asyncCSock):
|
||||
ftp.asyncCSock.connect(ftp.address, ftp.port)
|
||||
else:
|
||||
when T is PAsyncSocket:
|
||||
blockingOperation(ftp.csock):
|
||||
ftp.csock.connect(ftp.address, ftp.port)
|
||||
elif T is TSocket:
|
||||
ftp.csock.connect(ftp.address, ftp.port)
|
||||
else:
|
||||
{.fatal: "Incorrect socket instantiation".}
|
||||
|
||||
# TODO: Handle 120? or let user handle it.
|
||||
assertReply ftp.expectReply(), "220"
|
||||
@@ -270,56 +272,58 @@ proc connect*(ftp: PFTPClient) =
|
||||
if ftp.pass != "":
|
||||
assertReply ftp.send("PASS " & ftp.pass), "230"
|
||||
|
||||
proc pwd*(ftp: PFTPClient): string =
|
||||
proc pwd*[T](ftp: FtpBase[T]): string =
|
||||
## Returns the current working directory.
|
||||
var wd = ftp.send("PWD")
|
||||
assertReply wd, "257"
|
||||
return wd.string.captureBetween('"') # "
|
||||
|
||||
proc cd*(ftp: PFTPClient, dir: string) =
|
||||
proc cd*[T](ftp: FtpBase[T], dir: string) =
|
||||
## Changes the current directory on the remote FTP server to ``dir``.
|
||||
assertReply ftp.send("CWD " & dir.normalizePathSep), "250"
|
||||
|
||||
proc cdup*(ftp: PFTPClient) =
|
||||
proc cdup*[T](ftp: FtpBase[T]) =
|
||||
## Changes the current directory to the parent of the current directory.
|
||||
assertReply ftp.send("CDUP"), "200"
|
||||
|
||||
proc getLines(ftp: PFTPClient, async: bool = false): bool =
|
||||
proc getLines[T](ftp: FtpBase[T], async: bool = false): bool =
|
||||
## Downloads text data in ASCII mode
|
||||
## Returns true if the download is complete.
|
||||
## It doesn't if `async` is true, because it doesn't check for 226 then.
|
||||
if ftp.dsockConnected:
|
||||
var r = TaintedString""
|
||||
if ftp.isAsync:
|
||||
when T is PAsyncSocket:
|
||||
if ftp.asyncDSock.readLine(r):
|
||||
if r.string == "":
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
ftp.job.lines.add(r.string & "\n")
|
||||
else:
|
||||
elif T is TSocket:
|
||||
assert(not async)
|
||||
ftp.dsock.readLine(r)
|
||||
if r.string == "":
|
||||
ftp.dsockConnected = false
|
||||
else:
|
||||
ftp.job.lines.add(r.string & "\n")
|
||||
else:
|
||||
{.fatal: "Incorrect socket instantiation".}
|
||||
|
||||
if not async:
|
||||
var readSocks: seq[TSocket] = @[ftp.getCSock()]
|
||||
var readSocks: seq[TSocket] = @[ftp.csock]
|
||||
# This is only needed here. Asyncio gets this socket...
|
||||
blockingOperation(ftp.getCSock()):
|
||||
if readSocks.select(1) != 0 and ftp.getCSock() in readSocks:
|
||||
blockingOperation(ftp.csock):
|
||||
if readSocks.select(1) != 0 and ftp.csock in readSocks:
|
||||
assertReply ftp.expectReply(), "226"
|
||||
return true
|
||||
|
||||
proc listDirs*(ftp: PFTPClient, dir: string = "",
|
||||
proc listDirs*[T](ftp: FtpBase[T], dir: string = "",
|
||||
async = false): seq[string] =
|
||||
## Returns a list of filenames in the given directory. If ``dir`` is "",
|
||||
## the current directory is used. If ``async`` is true, this
|
||||
## function will return immediately and it will be your job to
|
||||
## use asyncio's ``poll`` to progress this operation.
|
||||
|
||||
ftp.createJob(getLines, JRetrText)
|
||||
ftp.createJob(getLines[T], JRetrText)
|
||||
ftp.pasv()
|
||||
|
||||
assertReply ftp.send("NLST " & dir.normalizePathSep), ["125", "150"]
|
||||
@@ -330,7 +334,7 @@ proc listDirs*(ftp: PFTPClient, dir: string = "",
|
||||
ftp.deleteJob()
|
||||
else: return @[]
|
||||
|
||||
proc fileExists*(ftp: PFTPClient, file: string): bool {.deprecated.} =
|
||||
proc fileExists*(ftp: FtpClient, file: string): bool {.deprecated.} =
|
||||
## **Deprecated since version 0.9.0:** Please use ``existsFile``.
|
||||
##
|
||||
## Determines whether ``file`` exists.
|
||||
@@ -341,7 +345,7 @@ proc fileExists*(ftp: PFTPClient, file: string): bool {.deprecated.} =
|
||||
for f in items(files):
|
||||
if f.normalizePathSep == file.normalizePathSep: return true
|
||||
|
||||
proc existsFile*(ftp: PFTPClient, file: string): bool =
|
||||
proc existsFile*(ftp: FtpClient, file: string): bool =
|
||||
## Determines whether ``file`` exists.
|
||||
##
|
||||
## Warning: This function may block. Especially on directories with many
|
||||
@@ -350,7 +354,7 @@ proc existsFile*(ftp: PFTPClient, file: string): bool =
|
||||
for f in items(files):
|
||||
if f.normalizePathSep == file.normalizePathSep: return true
|
||||
|
||||
proc createDir*(ftp: PFTPClient, dir: string, recursive: bool = false) =
|
||||
proc createDir*[T](ftp: FtpBase[T], dir: string, recursive: bool = false) =
|
||||
## Creates a directory ``dir``. If ``recursive`` is true, the topmost
|
||||
## subdirectory of ``dir`` will be created first, following the secondmost...
|
||||
## etc. this allows you to give a full path as the ``dir`` without worrying
|
||||
@@ -360,14 +364,14 @@ proc createDir*(ftp: PFTPClient, dir: string, recursive: bool = false) =
|
||||
else:
|
||||
var reply = TaintedString""
|
||||
var previousDirs = ""
|
||||
for p in split(dir, {os.dirSep, os.altSep}):
|
||||
for p in split(dir, {os.DirSep, os.AltSep}):
|
||||
if p != "":
|
||||
previousDirs.add(p)
|
||||
reply = ftp.send("MKD " & previousDirs)
|
||||
previousDirs.add('/')
|
||||
assertReply reply, "257"
|
||||
|
||||
proc chmod*(ftp: PFTPClient, path: string,
|
||||
proc chmod*[T](ftp: FtpBase[T], path: string,
|
||||
permissions: set[TFilePermission]) =
|
||||
## Changes permission of ``path`` to ``permissions``.
|
||||
var userOctal = 0
|
||||
@@ -389,12 +393,12 @@ proc chmod*(ftp: PFTPClient, path: string,
|
||||
assertReply ftp.send("SITE CHMOD " & perm &
|
||||
" " & path.normalizePathSep), "200"
|
||||
|
||||
proc list*(ftp: PFTPClient, dir: string = "", async = false): string =
|
||||
proc list*[T](ftp: FtpBase[T], dir: string = "", async = false): string =
|
||||
## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current
|
||||
## working directory. If ``async`` is true, this function will return
|
||||
## immediately and it will be your job to call asyncio's
|
||||
## ``poll`` to progress this operation.
|
||||
ftp.createJob(getLines, JRetrText)
|
||||
ftp.createJob(getLines[T], JRetrText)
|
||||
ftp.pasv()
|
||||
|
||||
assertReply(ftp.send("LIST" & " " & dir.normalizePathSep), ["125", "150"])
|
||||
@@ -406,11 +410,11 @@ proc list*(ftp: PFTPClient, dir: string = "", async = false): string =
|
||||
else:
|
||||
return ""
|
||||
|
||||
proc retrText*(ftp: PFTPClient, file: string, async = false): string =
|
||||
proc retrText*[T](ftp: FtpBase[T], file: string, async = false): string =
|
||||
## Retrieves ``file``. File must be ASCII text.
|
||||
## If ``async`` is true, this function will return immediately and
|
||||
## it will be your job to call asyncio's ``poll`` to progress this operation.
|
||||
ftp.createJob(getLines, JRetrText)
|
||||
ftp.createJob(getLines[T], JRetrText)
|
||||
ftp.pasv()
|
||||
assertReply ftp.send("RETR " & file.normalizePathSep), ["125", "150"]
|
||||
|
||||
@@ -421,17 +425,19 @@ proc retrText*(ftp: PFTPClient, file: string, async = false): string =
|
||||
else:
|
||||
return ""
|
||||
|
||||
proc getFile(ftp: PFTPClient, async = false): bool =
|
||||
proc getFile[T](ftp: FtpBase[T], async = false): bool =
|
||||
if ftp.dsockConnected:
|
||||
var r = "".TaintedString
|
||||
var bytesRead = 0
|
||||
var returned = false
|
||||
if async:
|
||||
if not ftp.isAsync: raise newException(EFTP, "FTPClient must be async.")
|
||||
bytesRead = ftp.AsyncDSock.recvAsync(r, BufferSize)
|
||||
returned = bytesRead != -1
|
||||
when T is TSocket:
|
||||
raise newException(EFTP, "FTPClient must be async.")
|
||||
else:
|
||||
bytesRead = ftp.dsock.recvAsync(r, BufferSize)
|
||||
returned = bytesRead != -1
|
||||
else:
|
||||
bytesRead = getDSock(ftp).recv(r, BufferSize)
|
||||
bytesRead = ftp.dsock.recv(r, BufferSize)
|
||||
returned = true
|
||||
let r2 = r.string
|
||||
if r2 != "":
|
||||
@@ -439,29 +445,29 @@ proc getFile(ftp: PFTPClient, async = false): bool =
|
||||
ftp.job.oneSecond.inc(r2.len)
|
||||
ftp.job.file.write(r2)
|
||||
elif returned and r2 == "":
|
||||
ftp.dsockConnected = False
|
||||
ftp.dsockConnected = false
|
||||
|
||||
if not async:
|
||||
var readSocks: seq[TSocket] = @[ftp.getCSock()]
|
||||
blockingOperation(ftp.getCSock()):
|
||||
if readSocks.select(1) != 0 and ftp.getCSock() in readSocks:
|
||||
var readSocks: seq[TSocket] = @[ftp.csock]
|
||||
blockingOperation(ftp.csock):
|
||||
if readSocks.select(1) != 0 and ftp.csock in readSocks:
|
||||
assertReply ftp.expectReply(), "226"
|
||||
return true
|
||||
|
||||
proc retrFile*(ftp: PFTPClient, file, dest: string, async = false) =
|
||||
proc retrFile*[T](ftp: FtpBase[T], file, dest: string, async = false) =
|
||||
## Downloads ``file`` and saves it to ``dest``. Usage of this function
|
||||
## asynchronously is recommended to view the progress of the download.
|
||||
## The ``EvRetr`` event is passed to the specified ``handleEvent`` function
|
||||
## when the download is finished, and the ``filename`` field will be equal
|
||||
## to ``file``.
|
||||
ftp.createJob(getFile, JRetr)
|
||||
ftp.createJob(getFile[T], JRetr)
|
||||
ftp.job.file = open(dest, mode = fmWrite)
|
||||
ftp.pasv()
|
||||
var reply = ftp.send("RETR " & file.normalizePathSep)
|
||||
assertReply reply, ["125", "150"]
|
||||
if {'(', ')'} notin reply.string:
|
||||
raise newException(EInvalidReply, "Reply has no file size.")
|
||||
var fileSize: biggestInt
|
||||
var fileSize: BiggestInt
|
||||
if reply.string.captureBetween('(', ')').parseBiggestInt(fileSize) == 0:
|
||||
raise newException(EInvalidReply, "Reply has no file size.")
|
||||
|
||||
@@ -473,11 +479,11 @@ proc retrFile*(ftp: PFTPClient, file, dest: string, async = false) =
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
ftp.deleteJob()
|
||||
|
||||
proc doUpload(ftp: PFTPClient, async = false): bool =
|
||||
proc doUpload[T](ftp: FtpBase[T], async = false): bool =
|
||||
if ftp.dsockConnected:
|
||||
if ftp.job.toStore.len() > 0:
|
||||
assert(async)
|
||||
let bytesSent = ftp.asyncDSock.sendAsync(ftp.job.toStore)
|
||||
let bytesSent = ftp.dsock.sendAsync(ftp.job.toStore)
|
||||
if bytesSent == ftp.job.toStore.len:
|
||||
ftp.job.toStore = ""
|
||||
elif bytesSent != ftp.job.toStore.len and bytesSent != 0:
|
||||
@@ -490,7 +496,7 @@ proc doUpload(ftp: PFTPClient, async = false): bool =
|
||||
setLen(s, len)
|
||||
if len == 0:
|
||||
# File finished uploading.
|
||||
if ftp.isAsync: ftp.asyncDSock.close() else: ftp.dsock.close()
|
||||
ftp.dsock.close()
|
||||
ftp.dsockConnected = false
|
||||
|
||||
if not async:
|
||||
@@ -499,9 +505,9 @@ proc doUpload(ftp: PFTPClient, async = false): bool =
|
||||
return false
|
||||
|
||||
if not async:
|
||||
getDSock(ftp).send(s)
|
||||
ftp.dsock.send(s)
|
||||
else:
|
||||
let bytesSent = ftp.asyncDSock.sendAsync(s)
|
||||
let bytesSent = ftp.dsock.sendAsync(s)
|
||||
if bytesSent == 0:
|
||||
ftp.job.toStore.add(s)
|
||||
elif bytesSent != s.len:
|
||||
@@ -511,14 +517,14 @@ proc doUpload(ftp: PFTPClient, async = false): bool =
|
||||
ftp.job.progress.inc(len)
|
||||
ftp.job.oneSecond.inc(len)
|
||||
|
||||
proc store*(ftp: PFTPClient, file, dest: string, async = false) =
|
||||
proc store*[T](ftp: FtpBase[T], file, dest: string, async = false) =
|
||||
## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this
|
||||
## function asynchronously is recommended to view the progress of
|
||||
## the download.
|
||||
## The ``EvStore`` event is passed to the specified ``handleEvent`` function
|
||||
## when the upload is finished, and the ``filename`` field will be
|
||||
## equal to ``file``.
|
||||
ftp.createJob(doUpload, JStore)
|
||||
ftp.createJob(doUpload[T], JStore)
|
||||
ftp.job.file = open(file)
|
||||
ftp.job.total = ftp.job.file.getFileSize()
|
||||
ftp.job.lastProgressReport = epochTime()
|
||||
@@ -531,16 +537,12 @@ proc store*(ftp: PFTPClient, file, dest: string, async = false) =
|
||||
while not ftp.job.prc(ftp, false): discard
|
||||
ftp.deleteJob()
|
||||
|
||||
proc close*(ftp: PFTPClient) =
|
||||
proc close*[T](ftp: FtpBase[T]) =
|
||||
## Terminates the connection to the server.
|
||||
assertReply ftp.send("QUIT"), "221"
|
||||
if ftp.jobInProgress: ftp.deleteJob()
|
||||
if ftp.isAsync:
|
||||
ftp.asyncCSock.close()
|
||||
ftp.asyncDSock.close()
|
||||
else:
|
||||
ftp.csock.close()
|
||||
ftp.dsock.close()
|
||||
ftp.csock.close()
|
||||
ftp.dsock.close()
|
||||
|
||||
proc csockHandleRead(s: PAsyncSocket, ftp: PAsyncFTPClient) =
|
||||
if ftp.jobInProgress:
|
||||
@@ -571,36 +573,34 @@ proc asyncFTPClient*(address: string, port = TPort(21),
|
||||
## Create a ``PAsyncFTPClient`` object.
|
||||
##
|
||||
## Use this if you want to use asyncio's dispatcher.
|
||||
var dres: PAsyncFTPClient
|
||||
var dres: AsyncFtpClient
|
||||
new(dres)
|
||||
dres.user = user
|
||||
dres.pass = pass
|
||||
dres.address = address
|
||||
dres.port = port
|
||||
dres.isAsync = true
|
||||
dres.dsockConnected = false
|
||||
dres.handleEvent = handleEvent
|
||||
dres.asyncCSock = AsyncSocket()
|
||||
dres.asyncCSock.handleRead =
|
||||
proc (s: PAsyncSocket) =
|
||||
dres.csock = asyncSocket()
|
||||
dres.csock.handleRead =
|
||||
proc (s: AsyncSocket) =
|
||||
csockHandleRead(s, dres)
|
||||
result = dres
|
||||
|
||||
proc register*(d: PDispatcher, ftp: PAsyncFTPClient): PDelegate {.discardable.} =
|
||||
## Registers ``ftp`` with dispatcher ``d``.
|
||||
assert ftp.isAsync
|
||||
ftp.disp = d
|
||||
return ftp.disp.register(ftp.asyncCSock)
|
||||
return ftp.disp.register(ftp.csock)
|
||||
|
||||
when isMainModule:
|
||||
proc main =
|
||||
var d = newDispatcher()
|
||||
let hev =
|
||||
proc (ftp: PAsyncFTPClient, event: TFTPEvent) =
|
||||
proc (ftp: AsyncFTPClient, event: FTPEvent) =
|
||||
case event.typ
|
||||
of EvStore:
|
||||
echo("Upload finished!")
|
||||
ftp.retrFile("payload.JPG", "payload2.JPG", async = true)
|
||||
ftp.retrFile("payload.jpg", "payload2.jpg", async = true)
|
||||
of EvTransferProgress:
|
||||
var time: int64 = -1
|
||||
if event.speed != 0:
|
||||
@@ -615,13 +615,13 @@ when isMainModule:
|
||||
ftp.close()
|
||||
echo d.len
|
||||
else: assert(false)
|
||||
var ftp = asyncFTPClient("picheta.me", user = "test", pass = "asf", handleEvent = hev)
|
||||
var ftp = asyncFTPClient("example.com", user = "foo", pass = "bar", handleEvent = hev)
|
||||
|
||||
d.register(ftp)
|
||||
d.len.echo()
|
||||
ftp.connect()
|
||||
echo "connected"
|
||||
ftp.store("payload.JPG", "payload.JPG", async = true)
|
||||
ftp.store("payload.jpg", "payload.jpg", async = true)
|
||||
d.len.echo()
|
||||
echo "uploading..."
|
||||
while true:
|
||||
@@ -629,15 +629,15 @@ when isMainModule:
|
||||
main()
|
||||
|
||||
when isMainModule and false:
|
||||
var ftp = ftpClient("picheta.me", user = "asdasd", pass = "asfwq")
|
||||
var ftp = ftpClient("example.com", user = "foo", pass = "bar")
|
||||
ftp.connect()
|
||||
echo ftp.pwd()
|
||||
echo ftp.list()
|
||||
echo("uploading")
|
||||
ftp.store("payload.JPG", "payload.JPG", async = false)
|
||||
ftp.store("payload.jpg", "payload.jpg", async = false)
|
||||
|
||||
echo("Upload complete")
|
||||
ftp.retrFile("payload.JPG", "payload2.JPG", async = false)
|
||||
ftp.retrFile("payload.jpg", "payload2.jpg", async = false)
|
||||
|
||||
echo("Download complete")
|
||||
sleep(5000)
|
||||
|
||||
@@ -662,8 +662,7 @@ when isMainModule:
|
||||
resp = await client.request("http://nim-lang.org/download.html")
|
||||
echo("Got response: ", resp.status)
|
||||
|
||||
asyncCheck main()
|
||||
runForever()
|
||||
waitFor main()
|
||||
|
||||
else:
|
||||
#downloadFile("http://force7.de/nim/index.html", "nimindex.html")
|
||||
|
||||
@@ -35,6 +35,10 @@ when defined(ssl):
|
||||
SSLAcceptResult* = enum
|
||||
AcceptNoClient = 0, AcceptNoHandshake, AcceptSuccess
|
||||
|
||||
{.deprecated: [ESSL: SSLError, TSSLCVerifyMode: SSLCVerifyMode,
|
||||
TSSLProtVersion: SSLProtVersion, PSSLContext: SSLContext,
|
||||
TSSLAcceptResult: SSLAcceptResult].}
|
||||
|
||||
const
|
||||
BufferSize*: int = 4000 ## size of a buffered socket's buffer
|
||||
|
||||
@@ -74,9 +78,7 @@ type
|
||||
|
||||
{.deprecated: [TSocketFlags: SocketFlag, ETimeout: TimeoutError,
|
||||
TReadLineResult: ReadLineResult, TSOBool: SOBool, PSocket: Socket,
|
||||
TSocketImpl: SocketImpl, ESSL: SSLError, TSSLCVerifyMode: SSLCVerifyMode,
|
||||
TSSLProtVersion: SSLProtVersion, PSSLContext: SSLContext,
|
||||
TSSLAcceptResult: SSLAcceptResult].}
|
||||
TSocketImpl: SocketImpl].}
|
||||
|
||||
proc isDisconnectionError*(flags: set[SocketFlag],
|
||||
lastError: OSErrorCode): bool =
|
||||
@@ -98,7 +100,7 @@ proc toOSFlags*(socketFlags: set[SocketFlag]): cint =
|
||||
result = result or MSG_PEEK
|
||||
of SocketFlag.SafeDisconn: continue
|
||||
|
||||
proc createSocket(fd: TSocketHandle, isBuff: bool): Socket =
|
||||
proc createSocket(fd: SocketHandle, isBuff: bool): Socket =
|
||||
assert fd != osInvalidSocket
|
||||
new(result)
|
||||
result.fd = fd
|
||||
@@ -276,19 +278,19 @@ proc bindAddr*(socket: Socket, port = Port(0), address = "") {.
|
||||
## If ``address`` is "" then ADDR_ANY will be bound.
|
||||
|
||||
if address == "":
|
||||
var name: TSockaddr_in
|
||||
var name: Sockaddr_in
|
||||
when useWinVersion:
|
||||
name.sin_family = toInt(AF_INET).int16
|
||||
else:
|
||||
name.sin_family = toInt(AF_INET)
|
||||
name.sin_port = htons(int16(port))
|
||||
name.sin_addr.s_addr = htonl(INADDR_ANY)
|
||||
if bindAddr(socket.fd, cast[ptr TSockAddr](addr(name)),
|
||||
sizeof(name).TSocklen) < 0'i32:
|
||||
if bindAddr(socket.fd, cast[ptr SockAddr](addr(name)),
|
||||
sizeof(name).Socklen) < 0'i32:
|
||||
raiseOSError(osLastError())
|
||||
else:
|
||||
var aiList = getAddrInfo(address, port, AF_INET)
|
||||
if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.TSocklen) < 0'i32:
|
||||
if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
|
||||
dealloc(aiList)
|
||||
raiseOSError(osLastError())
|
||||
dealloc(aiList)
|
||||
@@ -311,9 +313,9 @@ proc acceptAddr*(server: Socket, client: var Socket, address: var string,
|
||||
## flag is specified then this error will not be raised and instead
|
||||
## accept will be called again.
|
||||
assert(client != nil)
|
||||
var sockAddress: TSockaddr_in
|
||||
var addrLen = sizeof(sockAddress).TSocklen
|
||||
var sock = accept(server.fd, cast[ptr TSockAddr](addr(sockAddress)),
|
||||
var sockAddress: Sockaddr_in
|
||||
var addrLen = sizeof(sockAddress).Socklen
|
||||
var sock = accept(server.fd, cast[ptr SockAddr](addr(sockAddress)),
|
||||
addr(addrLen))
|
||||
|
||||
if sock == osInvalidSocket:
|
||||
@@ -452,7 +454,7 @@ proc connect*(socket: Socket, address: string, port = Port(0),
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
if connect(socket.fd, it.ai_addr, it.ai_addrlen.TSocklen) == 0'i32:
|
||||
if connect(socket.fd, it.ai_addr, it.ai_addrlen.SockLen) == 0'i32:
|
||||
success = true
|
||||
break
|
||||
else: lastError = osLastError()
|
||||
@@ -751,10 +753,10 @@ proc recvFrom*(socket: Socket, data: var string, length: int,
|
||||
|
||||
# TODO: Buffered sockets
|
||||
data.setLen(length)
|
||||
var sockAddress: TSockaddr_in
|
||||
var addrLen = sizeof(sockAddress).TSocklen
|
||||
var sockAddress: SockAddrIn
|
||||
var addrLen = sizeof(sockAddress).SockLen
|
||||
result = recvfrom(socket.fd, cstring(data), length.cint, flags.cint,
|
||||
cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen))
|
||||
cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
|
||||
|
||||
if result != -1:
|
||||
data.setLen(result)
|
||||
@@ -831,7 +833,7 @@ proc sendTo*(socket: Socket, address: string, port: Port, data: pointer,
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
result = sendto(socket.fd, data, size.cint, flags.cint, it.ai_addr,
|
||||
it.ai_addrlen.TSocklen)
|
||||
it.ai_addrlen.SockLen)
|
||||
if result != -1'i32:
|
||||
success = true
|
||||
break
|
||||
@@ -864,7 +866,7 @@ proc connectAsync(socket: Socket, name: string, port = Port(0),
|
||||
var lastError: OSErrorCode
|
||||
var it = aiList
|
||||
while it != nil:
|
||||
var ret = connect(socket.fd, it.ai_addr, it.ai_addrlen.TSocklen)
|
||||
var ret = connect(socket.fd, it.ai_addr, it.ai_addrlen.SockLen)
|
||||
if ret == 0'i32:
|
||||
success = true
|
||||
break
|
||||
@@ -907,7 +909,7 @@ proc connect*(socket: Socket, address: string, port = Port(0), timeout: int,
|
||||
proc isSSL*(socket: Socket): bool = return socket.isSSL
|
||||
## Determines whether ``socket`` is a SSL socket.
|
||||
|
||||
proc getFD*(socket: Socket): TSocketHandle = return socket.fd
|
||||
proc getFD*(socket: Socket): SocketHandle = return socket.fd
|
||||
## Returns the socket's file descriptor
|
||||
|
||||
type
|
||||
|
||||
@@ -2013,7 +2013,10 @@ proc isHidden*(path: string): bool =
|
||||
## On Unix-like systems, a file is hidden if it starts with a '.' (period)
|
||||
## and is not *just* '.' or '..' ' ."
|
||||
when defined(Windows):
|
||||
wrapUnary(attributes, getFileAttributesW, path)
|
||||
when useWinUnicode:
|
||||
wrapUnary(attributes, getFileAttributesW, path)
|
||||
else:
|
||||
var attributes = getFileAttributesA(path)
|
||||
if attributes != -1'i32:
|
||||
result = (attributes and FILE_ATTRIBUTE_HIDDEN) != 0'i32
|
||||
else:
|
||||
|
||||
@@ -13,9 +13,9 @@
|
||||
##
|
||||
## **Note:** The current implementation of message passing is slow and does
|
||||
## not work with cyclic data structures.
|
||||
|
||||
when not declared(NimString):
|
||||
{.error: "You must not import this module explicitly".}
|
||||
|
||||
when not declared(NimString):
|
||||
{.error: "You must not import this module explicitly".}
|
||||
|
||||
type
|
||||
pbytes = ptr array[0.. 0xffff, byte]
|
||||
@@ -73,11 +73,11 @@ proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel,
|
||||
d = cast[TAddress](dest)
|
||||
s = cast[TAddress](src)
|
||||
sysAssert(mt != nil, "mt == nil")
|
||||
case mt.Kind
|
||||
case mt.kind
|
||||
of tyString:
|
||||
if mode == mStore:
|
||||
var x = cast[ppointer](dest)
|
||||
var s2 = cast[ppointer](s)[]
|
||||
var x = cast[PPointer](dest)
|
||||
var s2 = cast[PPointer](s)[]
|
||||
if s2 == nil:
|
||||
x[] = nil
|
||||
else:
|
||||
@@ -86,17 +86,17 @@ proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel,
|
||||
copyMem(ns, ss, ss.len+1 + GenericSeqSize)
|
||||
x[] = ns
|
||||
else:
|
||||
var x = cast[ppointer](dest)
|
||||
var s2 = cast[ppointer](s)[]
|
||||
var x = cast[PPointer](dest)
|
||||
var s2 = cast[PPointer](s)[]
|
||||
if s2 == nil:
|
||||
unsureAsgnRef(x, s2)
|
||||
else:
|
||||
unsureAsgnRef(x, copyString(cast[NimString](s2)))
|
||||
dealloc(t.region, s2)
|
||||
of tySequence:
|
||||
var s2 = cast[ppointer](src)[]
|
||||
var s2 = cast[PPointer](src)[]
|
||||
var seq = cast[PGenericSeq](s2)
|
||||
var x = cast[ppointer](dest)
|
||||
var x = cast[PPointer](dest)
|
||||
if s2 == nil:
|
||||
if mode == mStore:
|
||||
x[] = nil
|
||||
@@ -108,13 +108,13 @@ proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel,
|
||||
x[] = alloc(t.region, seq.len *% mt.base.size +% GenericSeqSize)
|
||||
else:
|
||||
unsureAsgnRef(x, newObj(mt, seq.len * mt.base.size + GenericSeqSize))
|
||||
var dst = cast[taddress](cast[ppointer](dest)[])
|
||||
var dst = cast[TAddress](cast[PPointer](dest)[])
|
||||
for i in 0..seq.len-1:
|
||||
storeAux(
|
||||
cast[pointer](dst +% i*% mt.base.size +% GenericSeqSize),
|
||||
cast[pointer](cast[TAddress](s2) +% i *% mt.base.size +%
|
||||
GenericSeqSize),
|
||||
mt.Base, t, mode)
|
||||
mt.base, t, mode)
|
||||
var dstseq = cast[PGenericSeq](dst)
|
||||
dstseq.len = seq.len
|
||||
dstseq.reserved = seq.len
|
||||
@@ -123,8 +123,8 @@ proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel,
|
||||
# copy type field:
|
||||
var pint = cast[ptr PNimType](dest)
|
||||
# XXX use dynamic type here!
|
||||
pint[] = mt
|
||||
if mt.base != nil:
|
||||
pint[] = mt
|
||||
if mt.base != nil:
|
||||
storeAux(dest, src, mt.base, t, mode)
|
||||
storeAux(dest, src, mt.node, t, mode)
|
||||
of tyTuple:
|
||||
@@ -134,8 +134,8 @@ proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel,
|
||||
storeAux(cast[pointer](d +% i*% mt.base.size),
|
||||
cast[pointer](s +% i*% mt.base.size), mt.base, t, mode)
|
||||
of tyRef:
|
||||
var s = cast[ppointer](src)[]
|
||||
var x = cast[ppointer](dest)
|
||||
var s = cast[PPointer](src)[]
|
||||
var x = cast[PPointer](dest)
|
||||
if s == nil:
|
||||
if mode == mStore:
|
||||
x[] = nil
|
||||
@@ -225,20 +225,20 @@ proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =
|
||||
acquireSys(q.lock)
|
||||
llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))
|
||||
releaseSys(q.lock)
|
||||
|
||||
proc tryRecv*[TMsg](c: var TChannel[TMsg]): tuple[dataAvaliable: bool,
|
||||
msg: TMsg] =
|
||||
## try to receives a message from the channel `c` if available. Otherwise
|
||||
## it returns ``(false, default(msg))``.
|
||||
|
||||
proc tryRecv*[TMsg](c: var TChannel[TMsg]): tuple[dataAvaliable: bool,
|
||||
msg: TMsg] =
|
||||
## try to receives a message from the channel `c` if available. Otherwise
|
||||
## it returns ``(false, default(msg))``.
|
||||
var q = cast[PRawChannel](addr(c))
|
||||
if q.mask != ChannelDeadMask:
|
||||
lockChannel(q):
|
||||
llRecv(q, addr(result.msg), cast[PNimType](getTypeInfo(result.msg)))
|
||||
llRecv(q, addr(result.msg), cast[PNimType](getTypeInfo(result.msg)))
|
||||
result.dataAvaliable = true
|
||||
|
||||
proc peek*[TMsg](c: var TChannel[TMsg]): int =
|
||||
## returns the current number of messages in the channel `c`. Returns -1
|
||||
## if the channel has been closed. **Note**: This is dangerous to use
|
||||
## if the channel has been closed. **Note**: This is dangerous to use
|
||||
## as it encourages races. It's much better to use ``tryRecv`` instead.
|
||||
var q = cast[PRawChannel](addr(c))
|
||||
if q.mask != ChannelDeadMask:
|
||||
|
||||
@@ -31,7 +31,7 @@ when defined(Windows):
|
||||
## Tries to acquire the lock `L`.
|
||||
|
||||
proc tryAcquireSys(L: var TSysLock): bool {.inline.} =
|
||||
result = TryAcquireSysAux(L) != 0'i32
|
||||
result = tryAcquireSysAux(L) != 0'i32
|
||||
|
||||
proc acquireSys(L: var TSysLock) {.stdcall, noSideEffect,
|
||||
dynlib: "kernel32", importc: "EnterCriticalSection".}
|
||||
|
||||
@@ -190,7 +190,7 @@ var globalsSlot = threadVarAlloc()
|
||||
|
||||
when emulatedThreadVars:
|
||||
proc GetThreadLocalVars(): pointer {.compilerRtl, inl.} =
|
||||
result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).tls)
|
||||
result = addr(cast[PGcThread](threadVarGetValue(globalsSlot)).tls)
|
||||
|
||||
when useStackMaskHack:
|
||||
proc maskStackPointer(offset: int): pointer {.compilerRtl, inl.} =
|
||||
@@ -210,7 +210,7 @@ when not defined(useNimRtl):
|
||||
initGC()
|
||||
|
||||
when emulatedThreadVars:
|
||||
if NimThreadVarsSize() > sizeof(TThreadLocalStorage):
|
||||
if nimThreadVarsSize() > sizeof(TThreadLocalStorage):
|
||||
echo "too large thread local storage size requested"
|
||||
quit 1
|
||||
|
||||
@@ -267,7 +267,7 @@ when not defined(boehmgc) and not hasSharedHeap:
|
||||
proc deallocOsPages()
|
||||
|
||||
template threadProcWrapperBody(closure: expr) {.immediate.} =
|
||||
when declared(globalsSlot): ThreadVarSetValue(globalsSlot, closure)
|
||||
when declared(globalsSlot): threadVarSetValue(globalsSlot, closure)
|
||||
var t = cast[ptr TThread[TArg]](closure)
|
||||
when useStackMaskHack:
|
||||
var tls: TThreadLocalStorage
|
||||
@@ -364,7 +364,7 @@ proc threadId*[TArg](t: var TThread[TArg]): TThreadId[TArg] {.inline.} =
|
||||
proc myThreadId*[TArg](): TThreadId[TArg] =
|
||||
## returns the thread ID of the thread that calls this proc. This is unsafe
|
||||
## because the type ``TArg`` is not checked for consistency!
|
||||
result = cast[TThreadId[TArg]](ThreadVarGetValue(globalsSlot))
|
||||
result = cast[TThreadId[TArg]](threadVarGetValue(globalsSlot))
|
||||
|
||||
when false:
|
||||
proc mainThreadId*[TArg](): TThreadId[TArg] =
|
||||
|
||||
@@ -199,14 +199,14 @@ else:
|
||||
importc: "GetCurrentDirectoryA", dynlib: "kernel32", stdcall.}
|
||||
proc setCurrentDirectoryA*(lpPathName: cstring): int32 {.
|
||||
importc: "SetCurrentDirectoryA", dynlib: "kernel32", stdcall.}
|
||||
proc createDirectoryA*(pathName: cstring, security: Pointer=nil): int32 {.
|
||||
proc createDirectoryA*(pathName: cstring, security: pointer=nil): int32 {.
|
||||
importc: "CreateDirectoryA", dynlib: "kernel32", stdcall.}
|
||||
proc removeDirectoryA*(lpPathName: cstring): int32 {.
|
||||
importc: "RemoveDirectoryA", dynlib: "kernel32", stdcall.}
|
||||
proc setEnvironmentVariableA*(lpName, lpValue: cstring): int32 {.
|
||||
stdcall, dynlib: "kernel32", importc: "SetEnvironmentVariableA".}
|
||||
|
||||
proc getModuleFileNameA*(handle: THandle, buf: CString, size: int32): int32 {.
|
||||
proc getModuleFileNameA*(handle: THandle, buf: cstring, size: int32): int32 {.
|
||||
importc: "GetModuleFileNameA", dynlib: "kernel32", stdcall.}
|
||||
|
||||
when useWinUnicode:
|
||||
@@ -304,7 +304,7 @@ else:
|
||||
dwFileAttributes: int32): WINBOOL {.
|
||||
stdcall, dynlib: "kernel32", importc: "SetFileAttributesA".}
|
||||
|
||||
proc copyFileA*(lpExistingFileName, lpNewFileName: CString,
|
||||
proc copyFileA*(lpExistingFileName, lpNewFileName: cstring,
|
||||
bFailIfExists: cint): cint {.
|
||||
importc: "CopyFileA", stdcall, dynlib: "kernel32".}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user