Fixed many deprecation warnings. asyncio.recvLine now throws an

exception when an error occurs. Added sockets.SocketError.
This commit is contained in:
Dominik Picheta
2012-12-23 14:05:16 +00:00
parent 3cbac13546
commit da609fc445
5 changed files with 160 additions and 82 deletions

View File

@@ -440,6 +440,9 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool =
## sockets properly. This function guarantees that ``line`` is a full line,
## if this function can only retrieve some data; it will save this data and
## add it to the result when a full line is retrieved.
##
## Unlike ``sockets.recvLine`` this function will raise an EOS or ESSL
## exception if an error occurs.
setLen(line.string, 0)
var dataReceived = "".TaintedString
var ret = s.socket.recvLineAsync(dataReceived)
@@ -458,6 +461,7 @@ proc recvLine*(s: PAsyncSocket, line: var TaintedString): bool =
of RecvDisconnected:
result = true
of RecvFail:
s.SocketError(async = true)
result = false
proc send*(sock: PAsyncSocket, data: string) =
@@ -594,7 +598,9 @@ when isMainModule:
proc testRead(s: PAsyncSocket, no: int) =
echo("Reading! " & $no)
var data = s.getSocket.recv()
var data = ""
if not s.recvLine(data):
OSError()
if data == "":
echo("Closing connection. " & $no)
s.close()

View File

@@ -96,8 +96,9 @@ type
EFTP* = object of ESynch
proc FTPClient*(address: string, port = TPort(21),
user, pass = ""): TFTPClient =
## Create a ``TFTPClient`` object.
user, pass = ""): PFTPClient =
## Create a ``PFTPClient`` object.
new(result)
result.user = user
result.pass = pass
result.address = address
@@ -278,11 +279,20 @@ proc getLines(ftp: PFTPClient, async: bool = false): bool =
## It doesn't if `async` is true, because it doesn't check for 226 then.
if ftp.dsockConnected:
var r = TaintedString""
if getDSock(ftp).recvAsync(r):
if r.string != "":
ftp.job.lines.add(r.string)
else:
ftp.dsockConnected = False
if ftp.isAsync:
if ftp.asyncDSock.recvLine(r):
if r.string == "":
ftp.dsockConnected = false
else:
ftp.job.lines.add(r.string & "\n")
else:
assert(not async)
if ftp.dsock.recvLine(r):
if r.string == "":
ftp.dsockConnected = false
else:
ftp.job.lines.add(r.string & "\n")
else: OSError()
if not async:
var readSocks: seq[TSocket] = @[ftp.getCSock()]
@@ -389,7 +399,7 @@ proc list*(ftp: PFTPClient, dir: string = "", async = false): string =
proc retrText*(ftp: PFTPClient, file: string, async = false): string =
## Retrieves ``file``. File must be ASCII text.
## If ``async`` is true, this function will return immediately and
## it will be your job to call ``poll`` to progress this operation.
## it will be your job to call asyncio's ``poll`` to progress this operation.
ftp.createJob(getLines, JRetrText)
ftp.pasv()
assertReply ftp.send("RETR " & file.normalizePathSep), ["125", "150"]
@@ -404,12 +414,14 @@ proc retrText*(ftp: PFTPClient, file: string, async = false): string =
proc getFile(ftp: PFTPClient, async = false): bool =
if ftp.dsockConnected:
var r = "".TaintedString
var bytesRead = 0
var returned = false
if async:
if not ftp.isAsync: raise newException(EFTP, "FTPClient must be async.")
returned = ftp.AsyncDSock.recvAsync(r)
bytesRead = ftp.AsyncDSock.recvAsync(r, BufferSize)
returned = bytesRead != -1
else:
r = getDSock(ftp).recv()
bytesRead = getDSock(ftp).recv(r, BufferSize)
returned = true
let r2 = r.string
if r2 != "":
@@ -429,8 +441,9 @@ proc getFile(ftp: PFTPClient, async = false): bool =
proc retrFile*(ftp: PFTPClient, file, dest: string, async = false) =
## Downloads ``file`` and saves it to ``dest``. Usage of this function
## asynchronously is recommended to view the progress of the download.
## The ``EvRetr`` event is given by ``poll`` when the download is finished,
## and the ``filename`` field will be equal to ``file``.
## The ``EvRetr`` event is passed to the specified ``handleEvent`` function
## when the download is finished, and the ``filename`` field will be equal
## to ``file``.
ftp.createJob(getFile, JRetr)
ftp.job.file = open(dest, mode = fmWrite)
ftp.pasv()
@@ -492,8 +505,9 @@ proc store*(ftp: PFTPClient, file, dest: string, async = false) =
## Uploads ``file`` to ``dest`` on the remote FTP server. Usage of this
## function asynchronously is recommended to view the progress of
## the download.
## The ``EvStore`` event is given by ``poll`` when the upload is finished,
## and the ``filename`` field will be equal to ``file``.
## The ``EvStore`` event is passed to the specified ``handleEvent`` function
## when the upload is finished, and the ``filename`` field will be
## equal to ``file``.
ftp.createJob(doUpload, JStore)
ftp.job.file = open(file)
ftp.job.total = ftp.job.file.getFileSize()
@@ -518,16 +532,6 @@ proc close*(ftp: PFTPClient) =
ftp.csock.close()
ftp.dsock.close()
discard """proc getSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
result = (SockIdle, InvalidSocket)
var ftp = PAsyncFTPClient(h)
if ftp.jobInProgress:
case ftp.job.typ
of JRetrText, JRetr, JStore:
if ftp.dsockStatus == SockConnecting or ftp.dsockStatus == SockConnected:
result = (ftp.dsockStatus, ftp.dsock)
else: result = (SockIdle, ftp.dsock)"""
proc csockHandleRead(s: PAsyncSocket, ftp: PAsyncFTPClient) =
if ftp.jobInProgress:
assertReply ftp.expectReply(), "226" # Make sure the transfer completed.
@@ -550,32 +554,6 @@ proc csockHandleRead(s: PAsyncSocket, ftp: PAsyncFTPClient) =
ftp.handleEvent(ftp, r)
discard """proc handleConnect(h: PObject) =
var ftp = PAsyncFTPClient(h)
ftp.dsockStatus = SockConnected
assert(ftp.jobInProgress)
if ftp.job.typ == JStore:
ftp.dele.mode = MWriteable
else:
ftp.dele.mode = MReadable"""
discard """proc handleRead(h: PObject) =
var ftp = PAsyncFTPClient(h)
assert(ftp.jobInProgress)
assert(ftp.job.typ != JStore)
# This can never return true, because it shouldn't check for code
# 226 from csock.
assert(not ftp.job.prc(ftp[], true))
"""
discard """proc csockGetSocket(h: PObject): tuple[info: TInfo, sock: TSocket] =
# This only returns the csock if a job is in progress. Otherwise handle read
# would capture data which is not for it to capture.
result = (SockIdle, InvalidSocket)
var ftp = PAsyncFTPClient(h)
if ftp.jobInProgress:
result = (SockConnected, ftp.csock)"""
proc AsyncFTPClient*(address: string, port = TPort(21),
user, pass = "",
handleEvent: proc (ftp: PAsyncFTPClient, ev: TFTPEvent) {.closure.} =

View File

@@ -49,26 +49,35 @@ proc raiseNoOK(status: string) =
raise newException(EInvalidReply, "Expected \"OK\" got \"$1\"" % status)
proc parseStatus(r: TRedis): TRedisStatus =
var line = r.socket.recv.string
var line = ""
if r.socket.recvLine(line):
if line == "":
raise newException(ERedis, "Server closed connection prematurely")
if line[0] == '-':
raise newException(ERedis, strip(line))
if line[0] != '+':
raiseInvalidReply('+', line[0])
return line.substr(1, line.len-3) # Strip '+' and \c\L.
if line[0] == '-':
raise newException(ERedis, strip(line))
if line[0] != '+':
raiseInvalidReply('+', line[0])
return line.substr(1) # Strip '+'
else:
OSError()
proc parseInteger(r: TRedis): TRedisInteger =
var line = r.socket.recv.string
var line = ""
if r.socket.recvLine(line):
if line == "":
raise newException(ERedis, "Server closed connection prematurely")
if line[0] == '-':
raise newException(ERedis, strip(line))
if line[0] != ':':
raiseInvalidReply(':', line[0])
# Strip ':' and \c\L.
if parseBiggestInt(line, result, 1) == 0:
raise newException(EInvalidReply, "Unable to parse integer.")
if line[0] == '-':
raise newException(ERedis, strip(line))
if line[0] != ':':
raiseInvalidReply(':', line[0])
# Strip ':'
if parseBiggestInt(line, result, 1) == 0:
raise newException(EInvalidReply, "Unable to parse integer.")
else: OSError()
proc recv(sock: TSocket, size: int): TaintedString =
result = newString(size).TaintedString
@@ -838,8 +847,11 @@ proc save*(r: TRedis) =
proc shutdown*(r: TRedis) =
## Synchronously save the dataset to disk and then shut down the server
r.sendCommand("SHUTDOWN")
var s = r.socket.recv()
if s.string.len != 0: raise newException(ERedis, s.string)
var s = "".TaintedString
if r.socket.recvLine(s):
if s.string.len != 0: raise newException(ERedis, s.string)
else:
OSError()
proc slaveof*(r: TRedis, host: string, port: string) =
## Make the server a slave of another instance, or promote it as master

View File

@@ -47,12 +47,15 @@ when defined(ssl):
TSSLAcceptResult* = enum
AcceptNoClient = 0, AcceptNoHandshake, AcceptSuccess
const
BufferSize*: int = 4000 ## size of a buffered socket's buffer
type
TSocketImpl = object ## socket type
fd: cint
case isBuffered: bool # determines whether this socket is buffered.
of true:
buffer: array[0..4000, char]
buffer: array[0..BufferSize, char]
currPos: int # current index in buffer
bufLen: int # current length of buffer
of false: nil
@@ -300,6 +303,48 @@ when defined(ssl):
if SSLSetFd(socket.sslHandle, socket.fd) != 1:
SSLError()
proc SocketError*(socket: TSocket, err: int = -1, async = false) =
## Raises proper errors based on return values of ``recv`` functions.
##
## If ``async`` is ``True`` no error will be thrown in the case when the
## error was caused by no data being available to be read.
##
## If ``err`` is not lower than 0 no exception will be raised.
when defined(ssl):
if socket.isSSL:
if err <= 0:
var ret = SSLGetError(socket.sslHandle, err.cint)
case ret
of SSL_ERROR_ZERO_RETURN:
SSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
if async:
return
else: SSLError("Not enough data on socket.")
of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
if async:
return
else: SSLError("Not enough data on socket.")
of SSL_ERROR_WANT_X509_LOOKUP:
SSLError("Function for x509 lookup has been called.")
of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
SSLError()
else: SSLError("Unknown Error")
if err == -1 and not (when defined(ssl): socket.isSSL else: false):
if async:
when defined(windows):
# TODO: Test on Windows
var err = WSAGetLastError()
if err == WSAEWOULDBLOCK:
return
else: OSError()
else:
if errno == EAGAIN or errno == EWOULDBLOCK:
return
else: OSError()
else: OSError()
proc listen*(socket: TSocket, backlog = SOMAXCONN) {.tags: [FReadIO].} =
## Marks ``socket`` as accepting connections.
## ``Backlog`` specifies the maximum length of the
@@ -1012,14 +1057,39 @@ proc recv*(socket: TSocket, data: pointer, size: int): int {.tags: [FReadIO].} =
proc recv*(socket: TSocket, data: var string, size: int): int =
## higher-level version of the above
##
## When 0 is returned the socket's connection has been closed.
##
## This function will throw an EOS exception when an error occurs. A value
## lower than 0 is never returned.
##
## **Note**: ``data`` must be initialised.
data.setLen(size)
result = recv(socket, cstring(data), size)
if result < 0:
data.setLen(0)
socket.SocketError(result)
data.setLen(result)
proc recvAsync*(socket: TSocket, data: var string, size: int): int =
## Async version of the above.
##
## When socket is non-blocking and no data is available on the socket,
## ``-1`` will be returned and ``data`` will be ``""``.
##
## **Note**: ``data`` must be initialised.
data.setLen(size)
result = recv(socket, cstring(data), size)
if result < 0:
data.setLen(0)
socket.SocketError(async = true)
result = -1
data.setLen(result)
proc waitFor(socket: TSocket, waited: var float, timeout: int): int {.
tags: [FTime].} =
## returns the number of characters available to be read. In unbuffered
## sockets this is always 1, otherwise this may as big as the buffer, currently
## 4000.
## sockets this is always 1, otherwise this may as big as ``BufferSize``.
result = 1
if socket.isBuffered and socket.bufLen != 0 and socket.bufLen != socket.currPos:
result = socket.bufLen - socket.currPos
@@ -1050,9 +1120,16 @@ proc recv*(socket: TSocket, data: pointer, size: int, timeout: int): int {.
result = read
proc recv*(socket: TSocket, data: var string, size: int, timeout: int): int =
# higher-level version of the above
## higher-level version of the above.
##
## Similar to the non-timeout version this will throw an EOS exception
## when an error occurs.
data.setLen(size)
result = recv(socket, cstring(data), size, timeout)
if result < 0:
data.setLen(0)
socket.SocketError()
data.setLen(result)
proc peekChar(socket: TSocket, c: var char): int {.tags: [FReadIO].} =
if socket.isBuffered:
@@ -1080,7 +1157,7 @@ proc recvLine*(socket: TSocket, line: var TaintedString): bool {.
## added to ``line``, however if solely ``\r\L`` is received then ``line``
## will be set to it.
##
## ``True`` is returned if data is available. ``False`` usually suggests an
## ``True`` is returned if data is available. ``False`` suggests an
## error, EOS exceptions are not raised and ``False`` is simply returned
## instead.
##
@@ -1112,6 +1189,8 @@ proc recvLine*(socket: TSocket, line: var TaintedString, timeout: int): bool {.
tags: [FReadIO, FTime].} =
## variant with a ``timeout`` parameter, the timeout parameter specifies
## how many miliseconds to wait for data.
##
## ``ETimeout`` will be raised if ``timeout`` is exceeded.
template addNLIfEmpty(): stmt =
if line.len == 0:
line.add("\c\L")
@@ -1210,12 +1289,14 @@ proc recvTimeout*(socket: TSocket, timeout: int): TaintedString {.
return socket.recv
proc recvAsync*(socket: TSocket, s: var TaintedString): bool {.
tags: [FReadIO].} =
tags: [FReadIO], deprecated.} =
## receives all the data from a non-blocking socket. If socket is non-blocking
## and there are no messages available, `False` will be returned.
## Other socket errors will result in an ``EOS`` error.
## If socket is not a connectionless socket and socket is not connected
## ``s`` will be set to ``""``.
##
## **Deprecated since version 0.9.2**: This function is not safe for use.
const bufSize = 1000
# ensure bufSize capacity:
setLen(s.string, bufSize)

View File

@@ -272,7 +272,7 @@ proc handleWebMessage(state: PState, line: string) =
message.add(limitCommitMsg(commit["message"].str))
# Send message to #nimrod.
state.ircClient[].privmsg(joinChans[0], message)
state.ircClient.privmsg(joinChans[0], message)
elif json.existsKey("redisinfo"):
assert json["redisinfo"].existsKey("port")
#let redisPort = json["redisinfo"]["port"].num
@@ -336,10 +336,11 @@ proc hubConnect(state: PState) =
state.dispatcher.register(state.sock)
proc handleIrc(irc: var TAsyncIRC, event: TIRCEvent, state: PState) =
proc handleIrc(irc: PAsyncIRC, event: TIRCEvent, state: PState) =
case event.typ
of EvConnected: nil
of EvDisconnected:
while not state.ircClient[].isConnected:
while not state.ircClient.isConnected:
try:
state.ircClient.connect()
except:
@@ -355,12 +356,12 @@ proc handleIrc(irc: var TAsyncIRC, event: TIRCEvent, state: PState) =
let msg = event.params[event.params.len-1]
let words = msg.split(' ')
template pm(msg: string): stmt =
state.ircClient[].privmsg(event.origin, msg)
state.ircClient.privmsg(event.origin, msg)
case words[0]
of "!ping": pm("pong")
of "!lag":
if state.ircClient[].getLag != -1.0:
var lag = state.ircClient[].getLag
if state.ircClient.getLag != -1.0:
var lag = state.ircClient.getLag
lag = lag * 1000.0
pm($int(lag) & "ms between me and the server.")
else:
@@ -433,7 +434,7 @@ proc open(port: TPort = TPort(5123)): PState =
res.hubPort = port
res.hubConnect()
let hirc =
proc (a: var TAsyncIRC, ev: TIRCEvent) =
proc (a: PAsyncIRC, ev: TIRCEvent) =
handleIrc(a, ev, res)
# Connect to the irc server.
res.ircClient = AsyncIrc(ircServer, nick = botNickname, user = botNickname,