Async readLine now works. Fixes recv issues.

When using MSG_PEEK and data is retrieved ``lpNumberOfBytesRecvd`` will
not be set to the number of bytes read by WSARecv. The buffer must
therefore be checked to ensure it's empty when determining whether
``recv`` shall return "" to signal disconnection as we want to read as
much data as has been received by the system.
This commit is contained in:
Dominik Picheta
2014-02-15 21:13:23 +00:00
parent de8d473301
commit fb7598d25a

View File

@@ -99,7 +99,8 @@ when defined(windows):
TCompletionData* = object
sock: TSocketHandle
cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) {.closure.}
cb: proc (sock: TSocketHandle, bytesTransferred: DWORD,
errcode: TOSErrorCode) {.closure.}
PDispatcher* = ref object
ioPort: THandle
@@ -143,13 +144,15 @@ when defined(windows):
# This is useful for ensuring the reliability of the overlapped struct.
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
customOverlapped.data.cb(customOverlapped.data.sock, TOSErrorCode(-1))
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, TOSErrorCode(-1))
dealloc(customOverlapped)
else:
let errCode = OSLastError()
if lpOverlapped != nil:
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
customOverlapped.data.cb(customOverlapped.data.sock, errCode)
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, errCode)
dealloc(customOverlapped)
else:
if errCode.int32 == WAIT_TIMEOUT:
@@ -248,7 +251,7 @@ when defined(windows):
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
@@ -297,16 +300,19 @@ when defined(windows):
var flagsio = flags.dword
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete("")
else:
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
addr flagsio, cast[POverlapped](ol), nil)
if ret == -1:
@@ -314,8 +320,13 @@ when defined(windows):
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
elif ret == 0 and bytesReceived == 0:
# Disconnected
elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
# We have to ensure that the buffer is empty because WSARecv will tell
# us immediatelly when it was disconnected, even when there is still
# data in the buffer.
# We want to give the user as much data as we can. So we only return
# the empty string (which signals a disconnection) when there is
# nothing left to read.
retFuture.complete("")
# TODO: "For message-oriented sockets, where a zero byte message is often
# allowable, a failure with an error code of WSAEDISCON is used to
@@ -343,7 +354,7 @@ when defined(windows):
var bytesReceived, flags: DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
@@ -404,7 +415,7 @@ when defined(windows):
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
completeAccept()
@@ -525,6 +536,15 @@ proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} =
for i in 0 .. <node.len:
result[i] = processBody(node[i], retFutureSym)
proc getName(node: PNimrodNode): string {.compileTime.} =
case node.kind
of nnkPostfix:
return $node[1].ident
of nnkIdent:
return $node.ident
else:
assert false
macro async*(prc: stmt): stmt {.immediate.} =
expectKind(prc, nnkProcDef)
@@ -553,7 +573,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
# -> var result: T
# -> <proc_body>
# -> complete(retFuture, result)
var iteratorNameSym = newIdentNode($prc[0].ident & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter")
var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter")
var procBody = prc[6].processBody(retFutureSym)
procBody.insert(0, newNimNode(nnkVarSection).add(
newIdentDefs(newIdentNode("result"), prc[3][0][1]))) # -> var result: T
@@ -568,7 +588,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
# -> var nameIterVar = nameIter
# -> var first = nameIterVar()
var varNameIterSym = newIdentNode($prc[0].ident & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar")
var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar")
var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym)
outerProcBody.add varNameIter
var varFirstSym = genSym(nskVar, "first")
@@ -600,18 +620,14 @@ macro async*(prc: stmt): stmt {.immediate.} =
echo(toStrLit(result))
proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} =
## Reads a line of data from ``socket``.
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.
##
## If a full line is read ``\r\L`` is not
## added to ``line``, however if solely ``\r\L`` is read then ``line``
## will be set to it.
##
## If the socket is disconnected, ``line`` will be set to ``""``.
##
## An EOS exception will be raised in the case of a socket error.
##
## A timeout can be specified in miliseconds, if data is not received within
## the specified time an ETimeout exception will be raised.
template addNLIfEmpty(): stmt =
if result.len == 0:
@@ -629,7 +645,7 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
discard await p.recv(socket, 1)
addNLIfEmpty()
return
elif c == "\L":
elif c == "\L":
addNLIfEmpty()
return
add(result.string, c)
@@ -645,14 +661,24 @@ when isMainModule:
when true:
# Await tests
proc main(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "localhost", TPort(6667))
discard await p.connect(sock, "irc.freenode.net", TPort(6667))
while true:
var line = await p.recvLine(sock)
echo("Line is: ", line.repr)
if line == "":
echo "Disconnected"
break
proc peekTest(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "localhost", TPort(6667))
while true:
var line = await p.recv(sock, 1, MSG_PEEK)
var line2 = await p.recv(sock, 1)
echo(line.repr)
echo(line2.repr)
echo("---")
if line2 == "": break
sleep(500)
var f = main(p)