Fixed tasyncawait on Windows.

Implicit registration of an fd now only occurs when a new socket is
created (in socket() or accept()). This makes the implementation much
simpler, changes to the linux version will follow.
This commit is contained in:
Dominik Picheta
2014-03-12 20:42:36 +00:00
parent 15919b7c98
commit d97a397139
3 changed files with 34 additions and 11 deletions

View File

@@ -94,7 +94,8 @@ proc failed*[T](future: PFuture[T]): bool =
# TODO: Get rid of register. Do it implicitly.
when defined(windows) or defined(nimdoc):
import winlean
import winlean, sets, hashes
#from hashes import THash
type
TCompletionKey = dword
@@ -105,7 +106,7 @@ when defined(windows) or defined(nimdoc):
PDispatcher* = ref object
ioPort: THandle
hasHandles: bool
handles: TSet[TSocketHandle]
TCustomOverlapped = object
Internal*: DWORD
@@ -117,21 +118,31 @@ when defined(windows) or defined(nimdoc):
PCustomOverlapped = ptr TCustomOverlapped
proc hash(x: TSocketHandle): THash {.borrow.}
proc newDispatcher*(): PDispatcher =
## Creates a new Dispatcher instance.
new result
result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
result.handles = initSet[TSocketHandle]()
proc register*(p: PDispatcher, sock: TSocketHandle) =
## Registers ``sock`` with the dispatcher ``p``.
if CreateIOCompletionPort(sock.THandle, p.ioPort,
cast[TCompletionKey](sock), 1) == 0:
OSError(OSLastError())
p.hasHandles = true
p.handles.incl(sock)
proc verifyPresence(p: PDispatcher, sock: TSocketHandle) =
## Ensures that socket has been registered with the dispatcher.
if sock notin p.handles:
raise newException(EInvalidValue,
"Operation performed on a socket which has not been registered with" &
" the dispatcher yet.")
proc poll*(p: PDispatcher, timeout = 500) =
## Waits for completion events and processes them.
if not p.hasHandles:
if p.handles.len == 0:
raise newException(EInvalidValue, "No handles registered in dispatcher.")
let llTimeout =
@@ -237,7 +248,7 @@ when defined(windows) or defined(nimdoc):
##
## Returns a ``PFuture`` which will complete when the connection succeeds
## or an error occurs.
verifyPresence(p, socket)
var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed.
# Apparently ``ConnectEx`` expects the socket to be initially bound:
var saddr: Tsockaddr_in
@@ -298,7 +309,7 @@ when defined(windows) or defined(nimdoc):
## recv operation then the future may complete with only a part of the
## requested data read. If socket is disconnected and no data is available
## to be read then the future will complete with a value of ``""``.
verifyPresence(p, socket)
var retFuture = newFuture[string]()
var dataBuf: TWSABuf
@@ -354,6 +365,7 @@ when defined(windows) or defined(nimdoc):
proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] =
## Sends ``data`` to ``socket``. The returned future will complete once all
## data has been sent.
verifyPresence(p, socket)
var retFuture = newFuture[int]()
var dataBuf: TWSABuf
@@ -390,7 +402,9 @@ when defined(windows) or defined(nimdoc):
## Accepts a new connection. Returns a future containing the client socket
## corresponding to that connection and the remote address of the client.
## The future will complete when the connection is successfully accepted.
##
## The resulting client socket is automatically registered to dispatcher.
verifyPresence(p, socket)
var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
var clientSock = socket()
@@ -416,6 +430,7 @@ when defined(windows) or defined(nimdoc):
dwLocalAddressLength, dwRemoteAddressLength,
addr LocalSockaddr, addr localLen,
addr RemoteSockaddr, addr remoteLen)
p.register(clientSock)
# TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
retFuture.complete(
(address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr),
@@ -452,6 +467,13 @@ when defined(windows) or defined(nimdoc):
return retFuture
proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
typ: TType = SOCK_STREAM,
protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
## Creates a new socket and registers it with the dispatcher implicitly.
result = socket()
disp.register(result)
initAll()
else:
import selectors
@@ -871,7 +893,7 @@ when isMainModule:
sock.setBlocking false
when true:
when false:
# Await tests
proc main(p: PDispatcher): PFuture[int] {.async.} =
discard await p.connect(sock, "irc.freenode.net", TPort(6667))
@@ -898,7 +920,7 @@ when isMainModule:
else:
when true:
when false:
var f = p.connect(sock, "irc.poop.nl", TPort(6667))
f.callback =

View File

@@ -456,6 +456,7 @@ var
SO_DONTLINGER* {.importc, header: "Winsock2.h".}: cint
SO_EXCLUSIVEADDRUSE* {.importc, header: "Winsock2.h".}: cint # disallow local address reuse
SO_ERROR* {.importc, header: "Winsock2.h".}: cint
proc `==`*(x, y: TSocketHandle): bool {.borrow.}

View File

@@ -20,7 +20,7 @@ proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn
proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
for i in 0 .. <swarmSize:
var sock = socket()
var sock = disp.socket()
#disp.register(sock)
discard await disp.connect(sock, "localhost", port)
@@ -48,7 +48,7 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn
doAssert false
proc createServer(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
var server = socket()
var server = disp.socket()
#disp.register(server)
server.bindAddr(port)
server.listen()