Added asyncio module; irc, scgi and the ftpclient modules work with it. Added (de)allocCStringArray. Many async sockets fixes.

This commit is contained in:
dom96
2012-01-22 16:21:05 +00:00
parent e92693ec8d
commit b298b07567
8 changed files with 812 additions and 192 deletions

325
lib/pure/asyncio.nim Normal file
View File

@@ -0,0 +1,325 @@
#
#
# Nimrod's Runtime Library
# (c) Copyright 2012 Andreas Rumpf, Dominik Picheta
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
import sockets, os
## This module implements an asynchronous event loop for sockets.
## It is akin to Python's asyncore module. Many modules that use sockets
## have an implementation for this module, those modules should all have a
## ``register`` function which you should use to add it to a dispatcher so
## that you can receive the events associated with that module.
##
## Once everything is registered in a dispatcher, you need to call the ``poll``
## function in a while loop.
##
## **Note:** Most modules have tasks which need to be ran regularly, this is
## why you should not call ``poll`` with a infinite timeout, or even a
## very long one. In most cases the default timeout is fine.
##
## **Note:** This module currently only supports select(), this is limited by
## FD_SETSIZE, which is usually 1024. So you may only be able to use 1024
## sockets at a time.
##
## Most (if not all) modules that use asyncio provide a userArg which is passed
## on with the events. The type that you set userArg to must be inheriting from
## TObject!
type
TDelegate = object
deleVal*: PObject
handleRead*: proc (h: PObject)
handleWrite*: proc (h: PObject)
handleConnect*: proc (h: PObject)
handleAccept*: proc (h: PObject)
getSocket*: proc (h: PObject): tuple[info: TInfo, sock: TSocket]
task*: proc (h: PObject)
mode*: TMode
PDelegate* = ref TDelegate
PDispatcher* = ref TDispatcher
TDispatcher = object
delegates: seq[PDelegate]
PAsyncSocket* = ref TAsyncSocket
TAsyncSocket = object of TObject
socket: TSocket
info: TInfo
userArg: PObject
handleRead*: proc (s: PAsyncSocket, arg: PObject)
handleConnect*: proc (s: PAsyncSocket, arg: PObject)
handleAccept*: proc (s: PAsyncSocket, arg: PObject)
TInfo* = enum
SockIdle, SockConnecting, SockConnected, SockListening, SockClosed
TMode* = enum
MReadable, MWriteable, MReadWrite
proc newDelegate*(): PDelegate =
## Creates a new delegate.
new(result)
result.handleRead = (proc (h: PObject) = nil)
result.handleWrite = (proc (h: PObject) = nil)
result.handleConnect = (proc (h: PObject) = nil)
result.handleAccept = (proc (h: PObject) = nil)
result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
doAssert(false))
result.task = (proc (h: PObject) = nil)
result.mode = MReadable
proc newAsyncSocket(userArg: PObject = nil): PAsyncSocket =
new(result)
result.info = SockIdle
result.userArg = userArg
result.handleRead = (proc (s: PAsyncSocket, arg: PObject) = nil)
result.handleConnect = (proc (s: PAsyncSocket, arg: PObject) = nil)
result.handleAccept = (proc (s: PAsyncSocket, arg: PObject) = nil)
proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP,
userArg: PObject = nil): PAsyncSocket =
result = newAsyncSocket(userArg)
result.socket = socket(domain, typ, protocol)
if result.socket == InvalidSocket: OSError()
result.socket.setBlocking(false)
proc toDelegate(sock: PAsyncSocket): PDelegate =
result = newDelegate()
result.deleVal = sock
result.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
return (PAsyncSocket(h).info, PAsyncSocket(h).socket))
result.handleConnect = (proc (h: PObject) =
PAsyncSocket(h).info = SockConnected
PAsyncSocket(h).handleConnect(PAsyncSocket(h),
PAsyncSocket(h).userArg))
result.handleRead = (proc (h: PObject) =
PAsyncSocket(h).handleRead(PAsyncSocket(h),
PAsyncSocket(h).userArg))
result.handleAccept = (proc (h: PObject) =
PAsyncSocket(h).handleAccept(PAsyncSocket(h),
PAsyncSocket(h).userArg))
proc connect*(sock: PAsyncSocket, name: string, port = TPort(0),
af: TDomain = AF_INET) =
## Begins connecting ``sock`` to ``name``:``port``.
sock.socket.connectAsync(name, port, af)
sock.info = SockConnecting
proc close*(sock: PAsyncSocket) =
## Closes ``sock``. Terminates any current connections.
sock.info = SockClosed
sock.socket.close()
proc bindAddr*(sock: PAsyncSocket, port = TPort(0), address = "") =
## Equivalent to ``sockets.bindAddr``.
sock.socket.bindAddr(port, address)
proc listen*(sock: PAsyncSocket) =
## Equivalent to ``sockets.listen``.
sock.socket.listen()
sock.info = SockListening
proc acceptAddr*(server: PAsyncSocket): tuple[sock: PAsyncSocket,
address: string] =
## Equivalent to ``sockets.acceptAddr``.
var (client, a) = server.socket.acceptAddr()
if client == InvalidSocket: OSError()
client.setBlocking(false) # TODO: Needs to be tested.
var aSock: PAsyncSocket = newAsyncSocket()
aSock.socket = client
aSock.info = SockConnected
return (aSock, a)
proc accept*(server: PAsyncSocket): PAsyncSocket =
## Equivalent to ``sockets.accept``.
var (client, a) = server.acceptAddr()
return client
proc newDispatcher*(): PDispatcher =
new(result)
result.delegates = @[]
proc register*(d: PDispatcher, deleg: PDelegate) =
## Registers delegate ``deleg`` with dispatcher ``d``.
d.delegates.add(deleg)
proc register*(d: PDispatcher, sock: PAsyncSocket): PDelegate {.discardable.} =
## Registers async socket ``sock`` with dispatcher ``d``.
result = sock.toDelegate()
d.register(result)
proc unregister*(d: PDispatcher, deleg: PDelegate) =
## Unregisters deleg ``deleg`` from dispatcher ``d``.
for i in 0..len(d.delegates)-1:
if d.delegates[i] == deleg:
d.delegates.del(i)
return
raise newException(EInvalidIndex, "Could not find delegate.")
proc isWriteable*(s: PAsyncSocket): bool =
## Determines whether socket ``s`` is ready to be written to.
var writeSock = @[s.socket]
return selectWrite(writeSock, 1) != 0 and s.socket notin writeSock
proc `userArg=`*(s: PAsyncSocket, val: PObject) =
s.userArg = val
converter getSocket*(s: PAsyncSocket): TSocket =
return s.socket
proc isConnected*(s: PAsyncSocket): bool =
## Determines whether ``s`` is connected.
return s.info == SockConnected
proc isListening*(s: PAsyncSocket): bool =
## Determines whether ``s`` is listening for incoming connections.
return s.info == SockListening
proc isConnecting*(s: PAsyncSocket): bool =
## Determines whether ``s`` is connecting.
return s.info == SockConnecting
proc poll*(d: PDispatcher, timeout: int = 500): bool =
## This function checks for events on all the sockets in the `PDispatcher`.
## It then proceeds to call the correct event handler.
##
## **Note:** There is no event which signifes when you have been disconnected,
## it is your job to check whether what you get from ``recv`` is ``""``.
## If you have been disconnected, `d`'s ``getSocket`` function should report
## this appropriately.
##
## This function returns ``True`` if there are sockets that are still
## connected (or connecting), otherwise ``False``. Sockets that have been
## closed are immediately removed from the dispatcher automatically.
##
## **Note:** Each delegate has a task associated with it. This gets called
## after each select() call, if you make timeout ``-1`` the tasks will
## only be executed after one or more sockets becomes readable or writeable.
result = true
var readSocks, writeSocks: seq[TSocket] = @[]
var L = d.delegates.len
var dc = 0
while dc < L:
template deleg: expr = d.delegates[dc]
let aSock = deleg.getSocket(deleg.deleVal)
if (deleg.mode != MWriteable and aSock.info == SockConnected) or
aSock.info == SockListening:
readSocks.add(aSock.sock)
if aSock.info == SockConnecting or
(aSock.info == SockConnected and deleg.mode != MReadable):
writeSocks.add(aSock.sock)
if aSock.info == SockClosed:
# Socket has been closed remove it from the dispatcher.
d.delegates[dc] = d.delegates[L-1]
dec L
else: inc dc
d.delegates.setLen(L)
if readSocks.len() == 0 and writeSocks.len() == 0:
return False
if select(readSocks, writeSocks, timeout) != 0:
for i in 0..len(d.delegates)-1:
if i > len(d.delegates)-1: break # One delegate might've been removed.
let deleg = d.delegates[i]
let sock = deleg.getSocket(deleg.deleVal)
if sock.info == SockConnected:
if deleg.mode != MWriteable and sock.sock notin readSocks:
if not (sock.info == SockConnecting):
assert(not (sock.info == SockListening))
deleg.handleRead(deleg.deleVal)
else:
assert(false)
if deleg.mode != MReadable and sock.sock notin writeSocks:
deleg.handleWrite(deleg.deleVal)
if sock.info == SockListening:
if sock.sock notin readSocks:
# This is a server socket, that had listen() called on it.
# This socket should have a client waiting now.
deleg.handleAccept(deleg.deleVal)
if sock.info == SockConnecting:
# Checking whether the socket has connected this way should work on
# Windows and Posix. I've checked.
if sock.sock notin writeSocks:
deleg.handleConnect(deleg.deleVal)
# Execute tasks
for i in items(d.delegates):
i.task(i.deleVal)
when isMainModule:
type
PIntType = ref TIntType
TIntType = object of TObject
val: int
PMyArg = ref TMyArg
TMyArg = object of TObject
dispatcher: PDispatcher
val: int
proc testConnect(s: PAsyncSocket, arg: PObject) =
echo("Connected! " & $PIntType(arg).val)
proc testRead(s: PAsyncSocket, arg: PObject) =
echo("Reading! " & $PIntType(arg).val)
var data = s.getSocket.recv()
if data == "":
echo("Closing connection. " & $PIntType(arg).val)
s.close()
echo(data)
echo("Finished reading! " & $PIntType(arg).val)
proc testAccept(s: PAsyncSocket, arg: PObject) =
echo("Accepting client! " & $PMyArg(arg).val)
var (client, address) = s.acceptAddr()
echo("Accepted ", address)
client.handleRead = testRead
var userArg: PIntType
new(userArg)
userArg.val = 78
client.userArg = userArg
PMyArg(arg).dispatcher.register(client)
var d = newDispatcher()
var userArg: PIntType
new(userArg)
userArg.val = 0
var s = AsyncSocket(userArg = userArg)
s.connect("amber.tenthbit.net", TPort(6667))
s.handleConnect = testConnect
s.handleRead = testRead
d.register(s)
var userArg1: PMyArg
new(userArg1)
userArg1.val = 1
userArg1.dispatcher = d
var server = AsyncSocket(userArg = userArg1)
server.handleAccept = testAccept
server.bindAddr(TPort(5555))
server.listen()
d.register(server)
while d.poll(-1): nil

View File

@@ -6,34 +6,26 @@
# distribution, for details about the copyright.
#
import sockets, strutils, parseutils, times, os
import sockets, strutils, parseutils, times, os, asyncio
## This module **partially** implements an FTP client as specified
## by `RFC 959 <http://tools.ietf.org/html/rfc959>`_.
## Functions which require file transfers have an ``async`` parameter, when
## this parameter is set to ``true``, it is your job to call the ``poll``
## function periodically to progress the transfer.
##
## This module provides both a synchronous and asynchronous implementation.
## The asynchronous implementation requires you to use the ``AsyncFTPClient``
## function. You are then required to register the ``PAsyncFTPClient`` with a
## asyncio dispatcher using the ``register`` function. Take a look at the
## asyncio module documentation for more information.
##
## Here is some example usage of this module:
##
## .. code-block:: Nimrod
## var ftp = FTPClient("example.org", user = "user", pass = "pass")
## ftp.connect()
## ftp.retrFile("file.ext", "file.ext", async = true)
## while True:
## var event: TFTPEvent
## if ftp.poll(event):
## case event.typ
## of EvRetr:
## echo("Download finished!")
## break
## of EvTransferProgress:
## echo(event.speed div 1000, " kb/s")
## else: assert(false)
## ftp.retrFile("file.ext", "file.ext")
type
TFTPClient* = object
TFTPClient* = object of TObject
csock: TSocket # Command connection socket
dsock: TSocket # Data connection socket
user, pass: string
@@ -43,24 +35,35 @@ type
jobInProgress: bool
job: ref TFTPJob
isAsync: bool
dsockStatus: TInfo
FTPJobType = enum
JListCmd, JRetrText, JRetr, JStore
JRetrText, JRetr, JStore
TFTPJob = object
prc: proc (ftp: var TFTPClient, timeout: int): bool
prc: proc (ftp: var TFTPClient, async: bool): bool
case typ*: FTPJobType
of JListCmd, JRetrText:
of JRetrText:
lines: string
of JRetr, JStore:
dsockClosed: bool
file: TFile
filename: string
total: biggestInt # In bytes.
progress: biggestInt # In bytes.
oneSecond: biggestInt # Bytes transferred in one second.
lastProgressReport: float # Time
toStore: string # Data left to upload (Only used with async)
else: nil
PAsyncFTPClient* = ref TAsyncFTPClient ## Async alternative to TFTPClient.
TAsyncFTPClient* = object of TFTPClient
handleEvent*: proc (ftp: var TAsyncFTPClient, ev: TFTPEvent,
userArg: PObject)
dele: PDelegate
userArg: PObject
FTPEventType* = enum
EvTransferProgress, EvLines, EvRetr, EvStore
@@ -86,6 +89,9 @@ proc FTPClient*(address: string, port = TPort(21),
result.address = address
result.port = port
result.isAsync = false
result.dsockStatus = SockIdle
proc expectReply(ftp: var TFTPClient): TaintedString =
result = TaintedString""
if not ftp.csock.recvLine(result): setLen(result.string, 0)
@@ -110,7 +116,7 @@ proc assertReply(received: TaintedString, expected: openarray[string]) =
[expected.join("' or '"), received.string])
proc createJob(ftp: var TFTPClient,
prc: proc (ftp: var TFTPClient, timeout: int): bool,
prc: proc (ftp: var TFTPClient, async: bool): bool,
cmd: FTPJobType) =
if ftp.jobInProgress:
raise newException(EFTP, "Unable to do two jobs at once.")
@@ -119,22 +125,24 @@ proc createJob(ftp: var TFTPClient,
ftp.job.prc = prc
ftp.job.typ = cmd
case cmd
of JListCmd, JRetrText:
of JRetrText:
ftp.job.lines = ""
of JRetr, JStore:
ftp.job.dsockClosed = false
ftp.job.toStore = ""
proc deleteJob(ftp: var TFTPClient) =
assert ftp.jobInProgress
ftp.jobInProgress = false
case ftp.job.typ
of JListCmd, JRetrText:
of JRetrText:
ftp.job.lines = ""
of JRetr, JStore:
ftp.job.file.close()
proc pasv(ftp: var TFTPClient) =
## Negotiate a data connection.
ftp.dsock = socket()
if ftp.isAsync: ftp.dsock.setBlocking(false)
var pasvMsg = ftp.send("PASV").string.strip.TaintedString
assertReply(pasvMsg, "227")
var betweenParens = captureBetween(pasvMsg.string, '(', ')')
@@ -142,8 +150,14 @@ proc pasv(ftp: var TFTPClient) =
var ip = nums[0.. -3]
var port = nums[-2.. -1]
var properPort = port[0].parseInt()*256+port[1].parseInt()
ftp.dsock = socket()
ftp.dsock.connect(ip.join("."), TPort(properPort.toU16))
if ftp.isAsync:
# connectAsync should work well even if socket is blocking. But we need
# isAsync anyway... :\
ftp.dsock.connectAsync(ip.join("."), TPort(properPort.toU16))
ftp.dsockStatus = SockConnecting
else:
ftp.dsock.connect(ip.join("."), TPort(properPort.toU16))
ftp.dsockStatus = SockConnected
proc connect*(ftp: var TFTPClient) =
## Connect to the FTP server specified by ``ftp``.
@@ -173,16 +187,22 @@ proc cdup*(ftp: var TFTPClient) =
## Changes the current directory to the parent of the current directory.
assertReply ftp.send("CDUP"), "200"
proc asyncLines(ftp: var TFTPClient, timeout: int): bool =
## Downloads text data in ASCII mode, Asynchronously.
proc getLines(ftp: var TFTPClient, async: bool = false): bool =
## Downloads text data in ASCII mode
## Returns true if the download is complete.
var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock]
if readSocks.select(timeout) != 0:
if ftp.dsock notin readSocks:
var r = TaintedString""
if ftp.dsock.recvLine(r):
## It doesn't if `async` is true, because it doesn't check for 226 then.
if ftp.dsockStatus == SockConnected:
var r = TaintedString""
if ftp.dsock.recvLine(r):
if r.string != "":
ftp.job.lines.add(r.string & "\n")
if ftp.csock notin readSocks:
else:
ftp.dsockStatus = SockClosed
if not async:
var readSocks: seq[TSocket] = @[ftp.csock]
# This is only needed here. Asyncio gets this socket...
if readSocks.select(1) != 0 and ftp.csock notin readSocks:
assertReply ftp.expectReply(), "226"
return true
@@ -191,15 +211,15 @@ proc listDirs*(ftp: var TFTPClient, dir: string = "",
## Returns a list of filenames in the given directory. If ``dir`` is "",
## the current directory is used. If ``async`` is true, this
## function will return immediately and it will be your job to
## call ``poll`` to progress this operation.
## use asyncio's ``poll`` to progress this operation.
ftp.createJob(asyncLines, JRetrText)
ftp.createJob(getLines, JRetrText)
ftp.pasv()
assertReply ftp.send("NLST " & dir), ["125", "150"]
if not async:
while not ftp.job.prc(ftp, 500): nil
while not ftp.job.prc(ftp, false): nil
result = splitLines(ftp.job.lines)
ftp.deleteJob()
else: return @[]
@@ -254,15 +274,15 @@ proc chmod*(ftp: var TFTPClient, path: string,
proc list*(ftp: var TFTPClient, dir: string = "", async = false): string =
## Lists all files in ``dir``. If ``dir`` is ``""``, uses the current
## working directory. If ``async`` is true, this function will return
## immediately and it will be your job to call ``poll`` to progress this
## operation.
ftp.createJob(asyncLines, JRetrText)
## immediately and it will be your job to call asyncio's
## ``poll`` to progress this operation.
ftp.createJob(getLines, JRetrText)
ftp.pasv()
assertReply(ftp.send("LIST" & " " & dir), ["125", "150"])
if not async:
while not ftp.job.prc(ftp, 500): nil
while not ftp.job.prc(ftp, false): nil
result = ftp.job.lines
ftp.deleteJob()
else:
@@ -272,28 +292,36 @@ proc retrText*(ftp: var TFTPClient, file: string, async = false): string =
## Retrieves ``file``. File must be ASCII text.
## If ``async`` is true, this function will return immediately and
## it will be your job to call ``poll`` to progress this operation.
ftp.createJob(asyncLines, JRetrText)
ftp.createJob(getLines, JRetrText)
ftp.pasv()
assertReply ftp.send("RETR " & file), ["125", "150"]
if not async:
while not ftp.job.prc(ftp, 500): nil
while not ftp.job.prc(ftp, false): nil
result = ftp.job.lines
ftp.deleteJob()
else:
return ""
proc asyncFile(ftp: var TFTPClient, timeout: int): bool =
var readSocks: seq[TSocket] = @[ftp.dsock, ftp.csock]
if readSocks.select(timeout) != 0:
if ftp.dsock notin readSocks:
var r = ftp.dsock.recv().string
if r != "":
ftp.job.progress.inc(r.len)
ftp.job.oneSecond.inc(r.len)
ftp.job.file.write(r)
if ftp.csock notin readSocks:
proc getFile(ftp: var TFTPClient, async = false): bool =
if ftp.dsockStatus == SockConnected:
var r = "".TaintedString
var returned = false
if async: returned = ftp.dsock.recvAsync(r)
else:
r = ftp.dsock.recv()
returned = true
let r2 = r.string
if r2 != "":
ftp.job.progress.inc(r2.len)
ftp.job.oneSecond.inc(r2.len)
ftp.job.file.write(r2)
elif returned and r2 == "":
ftp.dsockStatus = SockClosed
if not async:
var readSocks: seq[TSocket] = @[ftp.csock]
if readSocks.select(1) != 0 and ftp.csock notin readSocks:
assertReply ftp.expectReply(), "226"
return true
@@ -302,7 +330,7 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) =
## asynchronously is recommended to view the progress of the download.
## The ``EvRetr`` event is given by ``poll`` when the download is finished,
## and the ``filename`` field will be equal to ``file``.
ftp.createJob(asyncFile, JRetr)
ftp.createJob(getFile, JRetr)
ftp.job.file = open(dest, mode = fmWrite)
ftp.pasv()
var reply = ftp.send("RETR " & file)
@@ -318,34 +346,40 @@ proc retrFile*(ftp: var TFTPClient, file, dest: string, async = false) =
ftp.job.filename = file
if not async:
while not ftp.job.prc(ftp, 500): nil
while not ftp.job.prc(ftp, false): nil
ftp.deleteJob()
proc asyncUpload(ftp: var TFTPClient, timeout: int): bool =
var writeSocks: seq[TSocket] = @[ftp.dsock]
var readSocks: seq[TSocket] = @[ftp.csock]
if select(readSocks, writeSocks, timeout) != 0:
if ftp.dsock notin writeSocks and not ftp.job.dsockClosed:
var buffer: array[0..1023, byte]
var len = ftp.job.file.readBytes(buffer, 0, 1024)
proc doUpload(ftp: var TFTPClient, async = false): bool =
if ftp.dsockStatus == SockConnected:
if ftp.job.toStore.len() > 0:
assert(async)
if ftp.dsock.sendAsync(ftp.job.toStore):
ftp.job.toStore = ""
ftp.job.progress.inc(ftp.job.toStore.len)
ftp.job.oneSecond.inc(ftp.job.toStore.len)
else:
var s = newStringOfCap(4000)
var len = ftp.job.file.readBuffer(addr(s[0]), 4000)
setLen(s, len)
if len == 0:
# File finished uploading.
ftp.dsock.close()
ftp.job.dsockClosed = true
return
if ftp.dsock.send(addr(buffer), len) != len:
raise newException(EIO, "could not 'send' all data.")
ftp.dsockStatus = SockClosed
if not async:
assertReply ftp.expectReply(), "226"
return true
return false
if not async:
ftp.dsock.send(s)
else:
if not ftp.dsock.sendAsync(s):
ftp.job.toStore = s
ftp.job.progress.inc(len)
ftp.job.oneSecond.inc(len)
if ftp.csock notin readSocks:
# TODO: Why does this block? Why does select
# think that the socket is readable?
assertReply ftp.expectReply(), "226"
return true
proc store*(ftp: var TFTPClient, file, dest: string, async = false) =
## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this
@@ -353,7 +387,7 @@ proc store*(ftp: var TFTPClient, file, dest: string, async = false) =
## the download.
## The ``EvStore`` event is given by ``poll`` when the upload is finished,
## and the ``filename`` field will be equal to ``file``.
ftp.createJob(asyncUpload, JStore)
ftp.createJob(doUpload, JStore)
ftp.job.file = open(file)
ftp.job.total = ftp.job.file.getFileSize()
ftp.job.lastProgressReport = epochTime()
@@ -363,42 +397,9 @@ proc store*(ftp: var TFTPClient, file, dest: string, async = false) =
assertReply ftp.send("STOR " & dest), ["125", "150"]
if not async:
while not ftp.job.prc(ftp, 500): nil
while not ftp.job.prc(ftp, false): nil
ftp.deleteJob()
proc poll*(ftp: var TFTPClient, r: var TFTPEvent, timeout = 500): bool =
## Progresses an async job(if available). Returns true if ``r`` has been set.
if ftp.jobInProgress:
if ftp.job.prc(ftp, timeout):
result = true
case ftp.job.typ
of JListCmd, JRetrText:
r.typ = EvLines
r.lines = ftp.job.lines
of JRetr:
r.typ = EvRetr
r.filename = ftp.job.filename
if ftp.job.progress != ftp.job.total:
raise newException(EFTP, "Didn't download full file.")
of JStore:
r.typ = EvStore
r.filename = ftp.job.filename
if ftp.job.progress != ftp.job.total:
raise newException(EFTP, "Didn't upload full file.")
ftp.deleteJob()
return
if ftp.job.typ in {JRetr, JStore}:
if epochTime() - ftp.job.lastProgressReport >= 1.0:
result = true
ftp.job.lastProgressReport = epochTime()
r.typ = EvTransferProgress
r.bytesTotal = ftp.job.total
r.bytesFinished = ftp.job.progress
r.speed = ftp.job.oneSecond
r.filename = ftp.job.filename
ftp.job.oneSecond = 0
proc close*(ftp: var TFTPClient) =
## Terminates the connection to the server.
assertReply ftp.send("QUIT"), "221"
@@ -406,13 +407,126 @@ proc close*(ftp: var TFTPClient) =
ftp.csock.close()
ftp.dsock.close()
proc handleTask(h: PObject) =
var ftp = PAsyncFTPClient(h)
if ftp.jobInProgress:
if ftp.job.typ in {JRetr, JStore}:
if epochTime() - ftp.job.lastProgressReport >= 1.0:
var r: TFTPEvent
ftp.job.lastProgressReport = epochTime()
r.typ = EvTransferProgress
r.bytesTotal = ftp.job.total
r.bytesFinished = ftp.job.progress
r.speed = ftp.job.oneSecond
r.filename = ftp.job.filename
ftp.job.oneSecond = 0
ftp.handleEvent(ftp[], r, ftp.userArg)
proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
result = (SockIdle, InvalidSocket)
var ftp = PAsyncFTPClient(h)
if ftp.jobInProgress:
case ftp.job.typ
of JRetrText, JRetr, JStore:
if ftp.dsockStatus == SockConnecting or ftp.dsockStatus == SockConnected:
result = (ftp.dsockStatus, ftp.dsock)
else: result = (SockIdle, ftp.dsock)
proc handleConnect(h: PObject) =
var ftp = PAsyncFTPClient(h)
ftp.dsockStatus = SockConnected
assert(ftp.jobInProgress)
if ftp.job.typ == JStore:
ftp.dele.mode = MWriteable
else:
ftp.dele.mode = MReadable
proc handleRead(h: PObject) =
var ftp = PAsyncFTPClient(h)
assert(ftp.jobInProgress)
assert(ftp.job.typ != JStore)
# This can never return true, because it shouldn't check for code
# 226 from csock.
assert(not ftp.job.prc(ftp[], true))
proc handleWrite(h: PObject) =
var ftp = PAsyncFTPClient(h)
if ftp.jobInProgress:
if ftp.job.typ == JStore:
assert (not ftp.job.prc(ftp[], true))
proc csockGetSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
# This only returns the csock if a job is in progress. Otherwise handle read
# would capture data which is not for it to capture.
result = (SockIdle, InvalidSocket)
var ftp = PAsyncFTPClient(h)
if ftp.jobInProgress:
result = (SockConnected, ftp.csock)
proc csockHandleRead(h: PObject) =
var ftp = PAsyncFTPClient(h)
assert(ftp.jobInProgress)
assertReply ftp[].expectReply(), "226" # Make sure the transfer completed.
var r: TFTPEvent
case ftp.job.typ
of JRetrText:
r.typ = EvLines
r.lines = ftp.job.lines
of JRetr:
r.typ = EvRetr
r.filename = ftp.job.filename
if ftp.job.progress != ftp.job.total:
raise newException(EFTP, "Didn't download full file.")
of JStore:
r.typ = EvStore
r.filename = ftp.job.filename
if ftp.job.progress != ftp.job.total:
raise newException(EFTP, "Didn't upload full file.")
ftp[].deleteJob()
ftp.handleEvent(ftp[], r, ftp.userArg)
proc AsyncFTPClient*(address: string, port = TPort(21),
user, pass = "", userArg: PObject = nil): PAsyncFTPClient =
## Create a ``PAsyncFTPClient`` object.
##
## Use this if you want to use asyncio's dispatcher.
new(result)
result.user = user
result.pass = pass
result.address = address
result.port = port
result.isAsync = true
result.dsockStatus = SockIdle
result.userArg = userArg
result.handleEvent = (proc (ftp: var TAsyncFTPClient, ev: TFTPEvent,
userArg: PObject) = nil)
proc register*(d: PDispatcher, ftp: PAsyncFTPClient) =
## Registers ``ftp`` with dispatcher ``d``.
ftp.dele = newDelegate()
ftp.dele.deleVal = ftp
ftp.dele.getSocket = getSocket
ftp.dele.task = handleTask
ftp.dele.handleConnect = handleConnect
ftp.dele.handleRead = handleRead
ftp.dele.handleWrite = handleWrite
d.register(ftp.dele)
# Add csock into the dispatcher (to check for 226).
var cDele = newDelegate()
cDele.deleVal = ftp
cDele.getSocket = csockGetSocket
cDele.handleRead = csockHandleRead
d.register(cDele)
when isMainModule:
var ftp = FTPClient("ex.org", user = "user", pass = "p")
var ftp = FTPClient("picheta.me", user = "blah", pass = "sd")
ftp.connect()
echo ftp.pwd()
echo ftp.list()
ftp.store("payload.avi", "payload.avi", async = true)
echo("uploading")
ftp.store("payload.avi", "payload.avi", async = false)
discard """
while True:
var event: TFTPEvent
if ftp.poll(event):
@@ -429,8 +543,10 @@ when isMainModule:
" - ", time, " seconds")
else: assert(false)
ftp.retrFile("payload.avi", "payload2.avi", async = true)
"""
echo("Upload complete")
ftp.retrFile("payload.avi", "payload2.avi", async = false)
discard """
while True:
var event: TFTPEvent
if ftp.poll(event):
@@ -441,7 +557,8 @@ when isMainModule:
of EvTransferProgress:
echo(event.speed div 1000, " kb/s")
else: assert(false)
"""
echo("Download complete")
sleep(5000)
ftp.close()
sleep(200)

View File

@@ -26,15 +26,15 @@
## of EvMsg:
## # Where all the magic happens.
import sockets, strutils, parseutils, times
import sockets, strutils, parseutils, times, asyncio
type
TIRC* = object
TIRC* = object of TObject
address: string
port: TPort
nick, user, realname, serverPass: string
sock: TSocket
connected: bool
status: TInfo
lastPing: float
lastPong: float
lag: float
@@ -42,6 +42,11 @@ type
msgLimit: bool
messageBuffer: seq[tuple[timeToSend: float, m: string]]
PAsyncIRC* = ref TAsyncIRC
TAsyncIRC* = object of TIRC
userArg: PObject
handleEvent: proc (irc: var TAsyncIRC, ev: TIRCEvent, userArg: PObject)
TIRCMType* = enum
MUnknown,
MNumeric,
@@ -89,7 +94,7 @@ proc send*(irc: var TIRC, message: string, sendImmediately = false) =
except EOS:
# Assuming disconnection of every EOS could be bad,
# but I can't exactly check for EBrokenPipe.
irc.connected = false
irc.status = SockClosed
proc privmsg*(irc: var TIRC, target, message: string) =
## Sends ``message`` to ``target``. ``Target`` can be a channel, or a user.
@@ -188,7 +193,7 @@ proc connect*(irc: var TIRC) =
irc.sock = socket()
irc.sock.connect(irc.address, irc.port)
irc.connected = true
irc.status = SockConnected
# Greet the server :)
if irc.serverPass != "": irc.send("PASS " & irc.serverPass, true)
@@ -201,7 +206,7 @@ proc irc*(address: string, port: TPort = 6667.TPort,
realname = "NimrodBot", serverPass = "",
joinChans: seq[string] = @[],
msgLimit: bool = true): TIRC =
## This function calls `connect`, so you don't need to.
## Creates a ``TIRC`` object.
result.address = address
result.port = port
result.nick = nick
@@ -214,45 +219,33 @@ proc irc*(address: string, port: TPort = 6667.TPort,
result.channelsToJoin = joinChans
result.msgLimit = msgLimit
result.messageBuffer = @[]
result.status = SockIdle
result.connect()
proc poll*(irc: var TIRC, ev: var TIRCEvent,
timeout: int = 500): bool =
## This function parses a single message from the IRC server and returns
## a TIRCEvent.
##
## This function should be called often as it also handles pinging
## the server.
if not irc.connected: ev.typ = EvDisconnected
var line = TaintedString""
var socks = @[irc.sock]
var ret = socks.select(timeout)
if socks.len() == 0 and ret == 1:
if irc.sock.recvLine(line):
if line.string.len == 0:
ev.typ = EvDisconnected
else:
ev = parseMessage(line.string)
# Get the origin
ev.origin = ev.params[0]
if ev.origin == irc.nick: ev.origin = ev.nick
proc processLine(irc: var TIRC, line: string): TIRCEvent =
if line.len == 0:
result.typ = EvDisconnected
else:
result = parseMessage(line)
# Get the origin
result.origin = result.params[0]
if result.origin == irc.nick: result.origin = result.nick
if ev.cmd == MError:
ev.typ = EvDisconnected
return
if result.cmd == MError:
result.typ = EvDisconnected
return
if ev.cmd == MPing:
irc.send("PONG " & ev.params[0])
if ev.cmd == MPong:
irc.lag = epochTime() - parseFloat(ev.params[ev.params.high])
irc.lastPong = epochTime()
if ev.cmd == MNumeric:
if ev.numeric == "001":
for chan in items(irc.channelsToJoin):
irc.join(chan)
result = true
if result.cmd == MPing:
irc.send("PONG " & result.params[0])
if result.cmd == MPong:
irc.lag = epochTime() - parseFloat(result.params[result.params.high])
irc.lastPong = epochTime()
if result.cmd == MNumeric:
if result.numeric == "001":
for chan in items(irc.channelsToJoin):
irc.join(chan)
proc processOther(irc: var TIRC, ev: var TIRCEvent): bool =
result = false
if epochTime() - irc.lastPing >= 20.0:
irc.lastPing = epochTime()
irc.send("PING :" & formatFloat(irc.lastPing), true)
@@ -260,7 +253,6 @@ proc poll*(irc: var TIRC, ev: var TIRCEvent,
if epochTime() - irc.lastPong >= 120.0 and irc.lastPong != -1.0:
ev.typ = EvDisconnected # TODO: EvTimeout?
return true
for i in 0..irc.messageBuffer.len-1:
if epochTime() >= irc.messageBuffer[0][0]:
@@ -270,6 +262,30 @@ proc poll*(irc: var TIRC, ev: var TIRCEvent,
break # messageBuffer is guaranteed to be from the quickest to the
# later-est.
proc poll*(irc: var TIRC, ev: var TIRCEvent,
timeout: int = 500): bool =
## This function parses a single message from the IRC server and returns
## a TIRCEvent.
##
## This function should be called often as it also handles pinging
## the server.
##
## This function provides a somewhat asynchronous IRC implementation, although
## it should only be used for simple things for example an IRC bot which does
## not need to be running many time critical tasks in the background. If you
## require this, use the asyncio implementation.
if not (irc.status == SockConnected): ev.typ = EvDisconnected
var line = TaintedString""
var socks = @[irc.sock]
var ret = socks.select(timeout)
if socks.len() == 0 and ret != 0:
if irc.sock.recvLine(line):
ev = irc.processLine(line)
result = true
if processOther(irc, ev): result = true
proc getLag*(irc: var TIRC): float =
## Returns the latency between this client and the IRC server in seconds.
##
@@ -278,8 +294,88 @@ proc getLag*(irc: var TIRC): float =
proc isConnected*(irc: var TIRC): bool =
## Returns whether this IRC client is connected to an IRC server.
return irc.connected
return irc.status == SockConnected
# -- Asyncio dispatcher
proc connect*(irc: PAsyncIRC) =
## Equivalent of connect for ``TIRC`` but specifically created for asyncio.
assert(irc.address != "")
assert(irc.port != TPort(0))
irc.sock = socket()
irc.sock.setBlocking(false)
irc.sock.connectAsync(irc.address, irc.port)
irc.status = SockConnecting
proc handleConnect(h: PObject) =
var irc = PAsyncIRC(h)
# Greet the server :)
if irc.serverPass != "": irc[].send("PASS " & irc.serverPass, true)
irc[].send("NICK " & irc.nick, true)
irc[].send("USER $1 * 0 :$2" % [irc.user, irc.realname], true)
irc.status = SockConnected
proc handleRead(h: PObject) =
var irc = PAsyncIRC(h)
var line = ""
if irc.sock.recvLine(line):
var ev = irc[].processLine(line)
irc.handleEvent(irc[], ev, irc.userArg)
proc handleTask(h: PObject) =
var irc = PAsyncIRC(h)
var ev: TIRCEvent
if PAsyncIRC(h)[].processOther(ev):
irc.handleEvent(irc[], ev, irc.userArg)
proc asyncIRC*(address: string, port: TPort = 6667.TPort,
nick = "NimrodBot",
user = "NimrodBot",
realname = "NimrodBot", serverPass = "",
joinChans: seq[string] = @[],
msgLimit: bool = true,
ircEvent: proc (irc: var TAsyncIRC, ev: TIRCEvent,
userArg: PObject),
userArg: PObject = nil): PAsyncIRC =
## Use this function if you want to use asyncio's dispatcher.
##
## **Note:** Do **NOT** use this if you're writing a simple IRC bot which only
## requires one task to be run, i.e. this should not be used if you want a
## synchronous IRC client implementation, use ``irc`` for that.
new(result)
result.address = address
result.port = port
result.nick = nick
result.user = user
result.realname = realname
result.serverPass = serverPass
result.lastPing = epochTime()
result.lastPong = -1.0
result.lag = -1.0
result.channelsToJoin = joinChans
result.msgLimit = msgLimit
result.messageBuffer = @[]
result.handleEvent = ircEvent
result.userArg = userArg
proc register*(d: PDispatcher, irc: PAsyncIRC) =
## Registers ``irc`` with dispatcher ``d``.
var dele = newDelegate()
dele.deleVal = irc
dele.getSocket = (proc (h: PObject): tuple[info: TInfo, sock: TSocket] =
if PAsyncIRC(h).status == SockConnecting or
PAsyncIRC(h).status == SockConnected:
return (PAsyncIRC(h).status, PAsyncIRC(h).sock)
else: return (SockIdle, PAsyncIRC(h).sock))
dele.handleConnect = handleConnect
dele.handleRead = handleRead
dele.task = handleTask
d.register(dele)
when isMainModule:
#var m = parseMessage("ERROR :Closing Link: dom96.co.cc (Ping timeout: 252 seconds)")
#echo(repr(m))
@@ -288,6 +384,7 @@ when isMainModule:
var client = irc("amber.tenthbit.net", nick="TestBot1234",
joinChans = @["#flood"])
client.connect()
while True:
var event: TIRCEvent
if client.poll(event):
@@ -305,3 +402,4 @@ when isMainModule:
#echo( repr(event) )
#echo("Lag: ", formatFloat(client.getLag()))
#"""

View File

@@ -487,13 +487,6 @@ elif not defined(useNimRtl):
result[i] = cast[cstring](alloc(x.len+1))
copyMem(result[i], addr(x[0]), x.len+1)
inc(i)
proc deallocCStringArray(a: cstringArray) =
var i = 0
while a[i] != nil:
dealloc(a[i])
inc(i)
dealloc(a)
proc startProcess(command: string,
workingDir: string = "",

View File

@@ -7,7 +7,7 @@
# distribution, for details about the copyright.
#
## This module implements helper procs for SCGI applictions. Example:
## This module implements helper procs for SCGI applications. Example:
##
## .. code-block:: Nimrod
##
@@ -24,7 +24,7 @@
## run(handleRequest)
##
import sockets, strutils, os, strtabs
import sockets, strutils, os, strtabs, asyncio
type
EScgi* = object of EIO ## the exception that is raised, if a SCGI error occurs
@@ -58,12 +58,18 @@ proc recvChar(s: TSocket): char =
result = c
type
TScgiState* {.final.} = object ## SCGI state object
TScgiState* = object of TObject ## SCGI state object
server: TSocket
bufLen: int
client*: TSocket ## the client socket to send data to
headers*: PStringTable ## the parsed headers
input*: string ## the input buffer
TAsyncScgiState* = object of TScgiState
handleRequest: proc (server: var TAsyncScgiState, client: TSocket,
input: string, headers: PStringTable,userArg: PObject)
userArg: PObject
PAsyncScgiState* = ref TAsyncScgiState
proc recvBuffer(s: var TScgiState, L: int) =
if L > s.bufLen:
@@ -131,6 +137,48 @@ proc run*(handleRequest: proc (client: TSocket, input: string,
s.client.close()
s.close()
proc open*(handleRequest: proc (server: var TAsyncScgiState, client: TSocket,
input: string, headers: PStringTable,
userArg: PObject),
port = TPort(4000), address = "127.0.0.1",
userArg: PObject = nil): PAsyncScgiState =
## Alternative of ``open`` for asyncio compatible SCGI.
new(result)
open(result[], port, address)
result.handleRequest = handleRequest
result.userArg = userArg
proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
var s = PAsyncScgiState(h)
return (SockListening, s.server)
proc handleAccept(h: PObject) =
var s = PAsyncScgiState(h)
s.client = accept(s.server)
var L = 0
while true:
var d = s.client.recvChar()
if d notin strutils.digits:
if d != ':': scgiError("':' after length expected")
break
L = L * 10 + ord(d) - ord('0')
recvBuffer(s[], L+1)
s.headers = parseHeaders(s.input, L)
if s.headers["SCGI"] != "1": scgiError("SCGI Version 1 expected")
L = parseInt(s.headers["CONTENT_LENGTH"])
recvBuffer(s[], L)
s.handleRequest(s[], s.client, s.input, s.headers, s.userArg)
proc register*(d: PDispatcher, s: PAsyncScgiState) =
## Registers ``s`` with dispatcher ``d``.
var dele = newDelegate()
dele.deleVal = s
dele.getSocket = getSocket
dele.handleAccept = handleAccept
d.register(dele)
when false:
var counter = 0
proc handleRequest(client: TSocket, input: string,

View File

@@ -142,9 +142,11 @@ proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
else:
result = TSocket(posix.socket(ToInt(domain), ToInt(typ), ToInt(protocol)))
proc listen*(socket: TSocket, attempts = 5) =
## listens to socket.
if listen(cint(socket), cint(attempts)) < 0'i32: OSError()
proc listen*(socket: TSocket, backlog = SOMAXCONN) =
## Marks ``socket`` as accepting connections.
## ``Backlog`` specifies the maximum length of the
## queue of pending connections.
if listen(cint(socket), cint(backlog)) < 0'i32: OSError()
proc invalidIp4(s: string) {.noreturn, noinline.} =
raise newException(EInvalidValue, "invalid ip4 address: " & s)
@@ -239,22 +241,35 @@ proc getSockName*(socket: TSocket): TPort =
OSError()
result = TPort(sockets.ntohs(name.sin_port))
proc accept*(server: TSocket): TSocket =
## waits for a client and returns its socket. ``InvalidSocket`` is returned
## if an error occurs, or if ``server`` is non-blocking and there are no
## clients connecting.
var client: Tsockaddr_in
var clientLen: cint = sizeof(client)
result = TSocket(accept(cint(server), cast[ptr TSockAddr](addr(client)),
addr(clientLen)))
proc acceptAddr*(server: TSocket): tuple[sock: TSocket, address: string] =
## waits for a client and returns its socket and IP address
## Blocks until a connection is being made from a client. When a connection
## is made returns the client socket and address of the connecting client.
## If ``server`` is non-blocking then this function returns immediately, and
## if there are no connections queued the returned socket will be
## ``InvalidSocket``.
## This function will raise EOS if an error occurs.
var address: Tsockaddr_in
var addrLen: cint = sizeof(address)
var sock = TSocket(accept(cint(server), cast[ptr TSockAddr](addr(address)),
addr(addrLen)))
return (sock, $inet_ntoa(address.sin_addr))
var sock = accept(cint(server), cast[ptr TSockAddr](addr(address)),
addr(addrLen))
if sock < 0:
# TODO: Test on Windows.
when defined(windows):
var err = WSAGetLastError()
if err == WSAEINPROGRESS:
return (InvalidSocket, "")
else: OSError()
else:
if errno == EAGAIN or errno == EWOULDBLOCK:
return (InvalidSocket, "")
else: OSError()
else: return (TSocket(sock), $inet_ntoa(address.sin_addr))
proc accept*(server: TSocket): TSocket =
## Equivalent to ``acceptAddr`` but doesn't return the address, only the
## socket.
var (client, a) = acceptAddr(server)
return client
proc close*(socket: TSocket) =
## closes a socket.
@@ -537,11 +552,14 @@ proc recvLine*(socket: TSocket, line: var TaintedString): bool =
## returns false if no further data is available. `Line` must be initialized
## and not nil! This does not throw an EOS exception, therefore
## it can be used in both blocking and non-blocking sockets.
## If ``socket`` is disconnected, ``true`` will be returned and line will be
## set to ``""``.
setLen(line.string, 0)
while true:
var c: char
var n = recv(cint(socket), addr(c), 1, 0'i32)
if n <= 0: return
if n < 0: return
elif n == 0: return true
if c == '\r':
n = recv(cint(socket), addr(c), 1, MSG_PEEK)
if n > 0 and c == '\L':
@@ -634,19 +652,20 @@ proc send*(socket: TSocket, data: string) =
## sends data to a socket.
if send(socket, cstring(data), data.len) != data.len: OSError()
proc sendAsync*(socket: TSocket, data: string) =
## sends data to a non-blocking socket.
proc sendAsync*(socket: TSocket, data: string): bool =
## sends data to a non-blocking socket. Returns whether ``data`` was sent.
result = true
var bytesSent = send(socket, cstring(data), data.len)
if bytesSent == -1:
when defined(windows):
var err = WSAGetLastError()
# TODO: Test on windows.
if err == WSAEINPROGRESS:
return
return false
else: OSError()
else:
if errno == EAGAIN or errno == EWOULDBLOCK:
return
return false
else: OSError()
when defined(Windows):

View File

@@ -1783,6 +1783,24 @@ when not defined(EcmaScript) and not defined(NimrodVM):
while a[L] != nil: inc(L)
result = cstringArrayToSeq(a, L)
proc allocCStringArray*(a: openArray[string]): cstringArray =
## creates a NULL terminated cstringArray from `a`. The result has to
## be freed with `deallocCStringArray` after it's not needed anymore.
result = cast[cstringArray](alloc0((a.len+1) * sizeof(cstring)))
for i in 0 .. a.high:
# XXX get rid of this string copy here:
var x = a[i]
result[i] = cast[cstring](alloc0(x.len+1))
copyMem(result[i], addr(x[0]), x.len)
proc deallocCStringArray*(a: cstringArray) =
## frees a NULL terminated cstringArray.
var i = 0
while a[i] != nil:
dealloc(a[i])
inc(i)
dealloc(a)
# ----------------------------------------------------------------------------
proc atomicInc*(memLoc: var int, x: int = 1): int {.inline, discardable.}

View File

@@ -321,6 +321,9 @@ type
Tsocklen* = cint
var
SOMAXCONN* {.importc, header: "Winsock2.h".}: cint
proc getservbyname*(name, proto: cstring): ptr TServent {.
stdcall, importc: "getservbyname", dynlib: ws2dll.}
@@ -474,4 +477,3 @@ proc CreateFileMapping*(hFile: THANDLE,
proc UnmapViewOfFile*(lpBaseAddress: pointer): WINBOOL {.stdcall,
dynlib: "kernel32", importc: "UnmapViewOfFile".}