Merge branch 'newasync' into devel

This commit is contained in:
Dominik Picheta
2014-02-16 20:03:58 +00:00
3 changed files with 399 additions and 83 deletions

View File

@@ -7,7 +7,7 @@
# distribution, for details about the copyright.
#
import os, oids, tables, strutils
import os, oids, tables, strutils, macros
import winlean
@@ -23,14 +23,13 @@ import sockets2, net
# -- Futures
type
PFutureVoid* = ref object of PObject
cbVoid: proc () {.closure.}
PFutureBase* = ref object of PObject
cb: proc () {.closure.}
finished: bool
PFuture*[T] = ref object of PFutureVoid
PFuture*[T] = ref object of PFutureBase
value: T
error: ref EBase
cb: proc (future: PFuture[T]) {.closure.}
proc newFuture*[T](): PFuture[T] =
## Creates a new future.
@@ -39,42 +38,38 @@ proc newFuture*[T](): PFuture[T] =
proc complete*[T](future: PFuture[T], val: T) =
## Completes ``future`` with value ``val``.
assert(not future.finished)
assert(not future.finished, "Future already finished, cannot finish twice.")
assert(future.error == nil)
future.value = val
future.finished = true
if future.cb != nil:
future.cb(future)
if future.cbVoid != nil:
future.cbVoid()
future.cb()
proc fail*[T](future: PFuture[T], error: ref EBase) =
## Completes ``future`` with ``error``.
assert(not future.finished)
assert(not future.finished, "Future already finished, cannot finish twice.")
future.finished = true
future.error = error
if future.cb != nil:
future.cb(future)
future.cb()
proc `callback=`*(future: PFutureBase, cb: proc () {.closure.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
##
## **Note**: You most likely want the other ``callback`` setter which
## passes ``future`` as a param to the callback.
future.cb = cb
if future.finished:
future.cb()
proc `callback=`*[T](future: PFuture[T],
cb: proc (future: PFuture[T]) {.closure.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.cb = cb
if future.finished:
future.cb(future)
proc `callbackVoid=`*(future: PFutureVoid, cb: proc () {.closure.}) =
## Sets the **void** callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
##
## **Note**: This is used for the ``await`` functionality, you most likely
## want to use ``callback``.
future.cbVoid = cb
if future.finished:
future.cbVoid()
future.callback = proc () = cb(future)
proc read*[T](future: PFuture[T]): T =
## Retrieves the value of ``future``. Future must be finished otherwise
@@ -104,10 +99,12 @@ when defined(windows):
TCompletionData* = object
sock: TSocketHandle
cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) {.closure.}
cb: proc (sock: TSocketHandle, bytesTransferred: DWORD,
errcode: TOSErrorCode) {.closure.}
PDispatcher* = ref object
ioPort: THandle
hasHandles: bool
TCustomOverlapped = object
Internal*: DWORD
@@ -129,9 +126,13 @@ when defined(windows):
if CreateIOCompletionPort(sock.THandle, p.ioPort,
cast[TCompletionKey](sock), 1) == 0:
OSError(OSLastError())
p.hasHandles = true
proc poll*(p: PDispatcher, timeout = 500) =
## Waits for completion events and processes them.
if not p.hasHandles:
raise newException(EInvalidValue, "No handles registered in dispatcher.")
let llTimeout =
if timeout == -1: winlean.INFINITE
else: timeout.int32
@@ -145,16 +146,19 @@ when defined(windows):
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
if res:
# This is useful for ensuring the reliability of the overlapped struct.
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
customOverlapped.data.cb(customOverlapped.data.sock, TOSErrorCode(-1))
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, TOSErrorCode(-1))
dealloc(customOverlapped)
else:
let errCode = OSLastError()
if lpOverlapped != nil:
assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, errCode)
dealloc(customOverlapped)
customOverlapped.data.cb(customOverlapped.data.sock, errCode)
else:
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
@@ -252,11 +256,12 @@ when defined(windows):
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
@@ -265,7 +270,9 @@ when defined(windows):
# Request to connect completed immediately.
success = true
retFuture.complete(0)
dealloc(ol)
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
break
else:
lastError = OSLastError()
@@ -283,7 +290,8 @@ when defined(windows):
retFuture.fail(newException(EOS, osErrorMsg(lastError)))
return retFuture
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int): PFuture[string] =
proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
flags: int = 0): PFuture[string] =
## Reads ``size`` bytes from ``socket``. Returned future will complete once
## all of the requested data is read.
@@ -293,31 +301,50 @@ when defined(windows):
dataBuf.buf = newString(size)
dataBuf.len = size
var bytesReceived, flags: DWord
var bytesReceived: DWord
var flagsio = flags.dword
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete("")
else:
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
addr flags, cast[POverlapped](ol), nil)
addr flagsio, cast[POverlapped](ol), nil)
if ret == -1:
let err = OSLastError()
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
# We have to ensure that the buffer is empty because WSARecv will tell
# us immediatelly when it was disconnected, even when there is still
# data in the buffer.
# We want to give the user as much data as we can. So we only return
# the empty string (which signals a disconnection) when there is
# nothing left to read.
retFuture.complete("")
# TODO: "For message-oriented sockets, where a zero byte message is often
# allowable, a failure with an error code of WSAEDISCON is used to
# indicate graceful closure."
# ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
else:
# Request to read completed immediately.
var data = newString(size)
copyMem(addr data[0], addr dataBuf.buf[0], size)
retFuture.complete($data)
dealloc(ol)
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] =
@@ -332,11 +359,12 @@ when defined(windows):
var bytesReceived, flags: DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
retFuture.complete(0)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived,
@@ -348,7 +376,9 @@ when defined(windows):
dealloc(ol)
else:
retFuture.complete(0)
dealloc(ol)
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc acceptAddr*(p: PDispatcher, socket: TSocketHandle):
@@ -390,11 +420,12 @@ when defined(windows):
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TSocketHandle, errcode: TOSErrorCode) =
if errcode == TOSErrorCode(-1):
completeAccept()
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
if errcode == TOSErrorCode(-1):
completeAccept()
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
)
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
@@ -411,7 +442,9 @@ when defined(windows):
dealloc(ol)
else:
completeAccept()
dealloc(ol)
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
@@ -434,6 +467,199 @@ when defined(windows):
else:
# TODO: Selectors.
# -- Await Macro
template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} =
proc cbName {.closure.} =
if not varNameIterSym.finished:
var next = varNameIterSym()
if next == nil:
assert retFutureSym.finished, "Async procedure's return Future was not finished."
else:
next.callback = cbName
template createVar(futSymName: string, asyncProc: PNimrodNode,
valueReceiver: expr) {.immediate, dirty.} =
# TODO: Used template here due to bug #926
result = newNimNode(nnkStmtList)
var futSym = newIdentNode(futSymName) #genSym(nskVar, "future")
result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x>
valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read
proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} =
result = node
case node.kind
of nnkReturnStmt:
result = newNimNode(nnkStmtList)
result.add newCall(newIdentNode("complete"), retFutureSym,
if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0])
result.add newNimNode(nnkYieldStmt).add(newNilLit())
of nnkCommand:
if node[0].ident == !"await":
case node[1].kind
of nnkIdent:
# await x
result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x
of nnkCall:
# await foo(p, x)
var futureValue: PNimrodNode
createVar("future" & $node[1][0].toStrLit, node[1], futureValue)
result.add futureValue
else:
error("Invalid node kind in 'await', got: " & $node[1].kind)
elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and
node[1][0].ident == !"await":
# foo await x
var newCommand = node
createVar("future" & $node[0].ident, node[1][0], newCommand[1])
result.add newCommand
of nnkVarSection, nnkLetSection:
case node[0][2].kind
of nnkCommand:
if node[0][2][0].ident == !"await":
# var x = await y
var newVarSection = node # TODO: Should this use copyNimNode?
createVar("future" & $node[0][0].ident, node[0][2][1],
newVarSection[0][2])
result.add newVarSection
else: discard
of nnkAsgn:
case node[1].kind
of nnkCommand:
if node[1][0].ident == !"await":
# x = await y
var newAsgn = node
createVar("future" & $node[0].ident, node[1][1], newAsgn[1])
result.add newAsgn
else: discard
of nnkDiscardStmt:
# discard await x
if node[0][0].ident == !"await":
var dummy = newNimNode(nnkStmtList)
createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy)
else: discard
for i in 0 .. <result.len:
result[i] = processBody(result[i], retFutureSym)
#echo(treeRepr(result))
proc getName(node: PNimrodNode): string {.compileTime.} =
case node.kind
of nnkPostfix:
return $node[1].ident
of nnkIdent:
return $node.ident
else:
assert false
macro async*(prc: stmt): stmt {.immediate.} =
expectKind(prc, nnkProcDef)
hint("Processing " & prc[0].getName & " as an async proc.")
# Verify that the return type is a PFuture[T]
if prc[3][0].kind == nnkIdent:
error("Expected return type of 'PFuture' got '" & $prc[3][0] & "'")
elif prc[3][0].kind == nnkBracketExpr:
if $prc[3][0][0] != "PFuture":
error("Expected return type of 'PFuture' got '" & $prc[3][0][0] & "'")
# TODO: Why can't I use genSym? I get illegal capture errors for Syms.
# TODO: It seems genSym is broken. Change all usages back to genSym when fixed
var outerProcBody = newNimNode(nnkStmtList)
# -> var retFuture = newFuture[T]()
var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture")
outerProcBody.add(
newVarStmt(retFutureSym,
newCall(
newNimNode(nnkBracketExpr).add(
newIdentNode("newFuture"),
prc[3][0][1])))) # Get type from return type of this proc.
# -> iterator nameIter(): PFutureBase {.closure.} =
# -> var result: T
# -> <proc_body>
# -> complete(retFuture, result)
var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter")
var procBody = prc[6].processBody(retFutureSym)
procBody.insert(0, newNimNode(nnkVarSection).add(
newIdentDefs(newIdentNode("result"), prc[3][0][1]))) # -> var result: T
procBody.add(
newCall(newIdentNode("complete"),
retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result)
var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")],
procBody, nnkIteratorDef)
closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure"))
outerProcBody.add(closureIterator)
# -> var nameIterVar = nameIter
# -> var first = nameIterVar()
var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar")
var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym)
outerProcBody.add varNameIter
var varFirstSym = genSym(nskVar, "first")
var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym))
outerProcBody.add varFirst
# -> createCb(cb, nameIter, retFuture)
var cbName = newIdentNode("cb")
var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym)
outerProcBody.add procCb
# -> first.callback = cb
outerProcBody.add newAssignment(
newDotExpr(varFirstSym, newIdentNode("callback")),
cbName)
# -> return retFuture
outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym)
result = prc
# Remove the 'async' pragma.
for i in 0 .. <result[4].len:
if result[4][i].ident == !"async":
result[4].del(i)
result[6] = outerProcBody
echo(toStrLit(result))
proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} =
## Reads a line of data from ``socket``. Returned future will complete once
## a full line is read or an error occurs.
##
## If a full line is read ``\r\L`` is not
## added to ``line``, however if solely ``\r\L`` is read then ``line``
## will be set to it.
##
## If the socket is disconnected, ``line`` will be set to ``""``.
template addNLIfEmpty(): stmt =
if result.len == 0:
result.add("\c\L")
result = ""
var c = ""
while true:
c = await p.recv(socket, 1)
if c.len == 0:
return
if c == "\r":
c = await p.recv(socket, 1, MSG_PEEK)
if c.len > 0 and c == "\L":
discard await p.recv(socket, 1)
addNLIfEmpty()
return
elif c == "\L":
addNLIfEmpty()
return
add(result.string, c)
when isMainModule:
@@ -442,39 +668,65 @@ when isMainModule:
#sock.setBlocking false
p.register(sock)
when true:
var f = p.connect(sock, "irc.freenode.org", TPort(6667))
f.callback =
proc (future: PFuture[int]) =
echo("Connected in future!")
echo(future.read)
for i in 0 .. 50:
var recvF = p.recv(sock, 10)
recvF.callback =
proc (future: PFuture[string]) =
echo("Read: ", future.read)
when false:
# Await tests
proc main(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "irc.freenode.net", TPort(6667))
while true:
var line = await p.recvLine(sock)
echo("Line is: ", line.repr)
if line == "":
echo "Disconnected"
break
proc peekTest(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "localhost", TPort(6667))
while true:
var line = await p.recv(sock, 1, MSG_PEEK)
var line2 = await p.recv(sock, 1)
echo(line.repr)
echo(line2.repr)
echo("---")
if line2 == "": break
sleep(500)
var f = main(p)
else:
when false:
sock.bindAddr(TPort(6667))
sock.listen()
proc onAccept(future: PFuture[TSocketHandle]) =
echo "Accepted"
var t = p.send(future.read, "test\c\L")
t.callback =
var f = p.connect(sock, "irc.freenode.org", TPort(6667))
f.callback =
proc (future: PFuture[int]) =
echo("Connected in future!")
echo(future.read)
for i in 0 .. 50:
var recvF = p.recv(sock, 10)
recvF.callback =
proc (future: PFuture[string]) =
echo("Read: ", future.read)
else:
sock.bindAddr(TPort(6667))
sock.listen()
proc onAccept(future: PFuture[TSocketHandle]) =
echo "Accepted"
var t = p.send(future.read, "test\c\L")
t.callback =
proc (future: PFuture[int]) =
echo(future.read)
var f = p.accept(sock)
f.callback = onAccept
var f = p.accept(sock)
f.callback = onAccept
var f = p.accept(sock)
f.callback = onAccept
while true:
p.poll()
echo "polled"

View File

@@ -199,14 +199,14 @@ else:
importc: "GetCurrentDirectoryA", dynlib: "kernel32", stdcall.}
proc setCurrentDirectoryA*(lpPathName: cstring): int32 {.
importc: "SetCurrentDirectoryA", dynlib: "kernel32", stdcall.}
proc createDirectoryA*(pathName: cstring, security: pointer=nil): int32 {.
proc createDirectoryA*(pathName: cstring, security: Pointer=nil): int32 {.
importc: "CreateDirectoryA", dynlib: "kernel32", stdcall.}
proc removeDirectoryA*(lpPathName: cstring): int32 {.
importc: "RemoveDirectoryA", dynlib: "kernel32", stdcall.}
proc setEnvironmentVariableA*(lpName, lpValue: cstring): int32 {.
stdcall, dynlib: "kernel32", importc: "SetEnvironmentVariableA".}
proc getModuleFileNameA*(handle: THandle, buf: cstring, size: int32): int32 {.
proc getModuleFileNameA*(handle: THandle, buf: CString, size: int32): int32 {.
importc: "GetModuleFileNameA", dynlib: "kernel32", stdcall.}
when useWinUnicode:
@@ -304,7 +304,7 @@ else:
dwFileAttributes: int32): WINBOOL {.
stdcall, dynlib: "kernel32", importc: "SetFileAttributesA".}
proc copyFileA*(lpExistingFileName, lpNewFileName: cstring,
proc copyFileA*(lpExistingFileName, lpNewFileName: CString,
bFailIfExists: cint): cint {.
importc: "CopyFileA", stdcall, dynlib: "kernel32".}

View File

@@ -0,0 +1,64 @@
discard """
file: "tasyncawait.nim"
cmd: "nimrod cc --hints:on $# $#"
output: "5000"
"""
import asyncio2, sockets2, net, strutils
var disp = newDispatcher()
var msgCount = 0
const
swarmSize = 50
messagesToSend = 100
var clientCount = 0
proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} =
for i in 0 .. <messagesToSend:
discard await disp.send(client, "Message " & $i & "\c\L")
proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
for i in 0 .. <swarmSize:
var sock = socket()
disp.register(sock)
discard await disp.connect(sock, "localhost", port)
when true:
discard await sendMessages(disp, sock)
sock.close()
else:
# Issue #932: https://github.com/Araq/Nimrod/issues/932
var msgFut = sendMessages(disp, sock)
msgFut.callback =
proc () =
sock.close()
proc readMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} =
while true:
var line = await disp.recvLine(client)
if line == "":
client.close()
clientCount.inc
break
else:
if line.startswith("Message "):
msgCount.inc
else:
doAssert false
proc createServer(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
var server = socket()
disp.register(server)
server.bindAddr(port)
server.listen()
while true:
discard readMessages(disp, await disp.accept(server))
discard disp.createServer(TPort(10335))
discard disp.launchSwarm(TPort(10335))
while true:
disp.poll()
if clientCount == swarmSize: break
assert msgCount == swarmSize * messagesToSend
echo msgCount