async might work now reliably

This commit is contained in:
Araq
2014-04-30 00:52:58 +02:00
parent 6619381768
commit d438ecc246
3 changed files with 32 additions and 27 deletions

View File

@@ -14,6 +14,8 @@ import os, oids, tables, strutils, macros
import rawsockets
export TPort
#{.injectStmt: newGcInvariant().}
## AsyncDispatch
## -------------
##
@@ -130,15 +132,10 @@ when defined(windows) or defined(nimdoc):
ioPort: THandle
handles: TSet[TAsyncFD]
TCustomOverlapped = object
Internal*: DWORD
InternalHigh*: DWORD
Offset*: DWORD
OffsetHigh*: DWORD
hEvent*: THANDLE
TCustomOverlapped = object of TOVERLAPPED
data*: TCompletionData
PCustomOverlapped = ptr TCustomOverlapped
PCustomOverlapped = ref TCustomOverlapped
TAsyncFD* = distinct int
@@ -184,27 +181,26 @@ when defined(windows) or defined(nimdoc):
else: timeout.int32
var lpNumberOfBytesTransferred: DWORD
var lpCompletionKey: ULONG
var lpOverlapped: POverlapped
var customOverlapped: PCustomOverlapped
let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred,
addr lpCompletionKey, addr lpOverlapped, llTimeout).bool
addr lpCompletionKey, addr customOverlapped, llTimeout).bool
# http://stackoverflow.com/a/12277264/492186
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
if res:
# This is useful for ensuring the reliability of the overlapped struct.
assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, TOSErrorCode(-1))
dealloc(customOverlapped)
GC_unref(customOverlapped)
else:
let errCode = osLastError()
if lpOverlapped != nil:
if customOverlapped != nil:
assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD
customOverlapped.data.cb(customOverlapped.data.sock,
lpNumberOfBytesTransferred, errCode)
dealloc(customOverlapped)
GC_unref(customOverlapped)
else:
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
@@ -300,7 +296,8 @@ when defined(windows) or defined(nimdoc):
while it != nil:
# "the OVERLAPPED structure must remain valid until the I/O completes"
# http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
@@ -328,7 +325,7 @@ when defined(windows) or defined(nimdoc):
success = true
break
else:
dealloc(ol)
GC_unref(ol)
success = false
it = it.ai_next
@@ -352,15 +349,15 @@ when defined(windows) or defined(nimdoc):
# '\0' in the message currently signifies a socket disconnect. Who
# knows what will happen when someone sends that to our socket.
verifyPresence(socket)
var retFuture = newFuture[string]()
var retFuture = newFuture[string]()
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](alloc0(size))
dataBuf.len = size
var bytesReceived: DWord
var flagsio = flags.DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
@@ -374,7 +371,9 @@ when defined(windows) or defined(nimdoc):
retFuture.complete($data)
else:
retFuture.fail(newException(EOS, osErrorMsg(errcode)))
dealloc dataBuf.buf
if dataBuf.buf != nil:
dealloc dataBuf.buf
dataBuf.buf = nil
)
let ret = WSARecv(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived,
@@ -382,8 +381,10 @@ when defined(windows) or defined(nimdoc):
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
dealloc dataBuf.buf
dealloc(ol)
if dataBuf.buf != nil:
dealloc dataBuf.buf
dataBuf.buf = nil
GC_unref(ol)
retFuture.fail(newException(EOS, osErrorMsg(err)))
elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
# We have to ensure that the buffer is empty because WSARecv will tell
@@ -426,7 +427,8 @@ when defined(windows) or defined(nimdoc):
dataBuf.len = data.len
var bytesReceived, flags: DWord
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
@@ -442,7 +444,7 @@ when defined(windows) or defined(nimdoc):
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
GC_unref(ol)
else:
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
@@ -490,7 +492,8 @@ when defined(windows) or defined(nimdoc):
client: clientSock.TAsyncFD)
)
var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = TCompletionData(sock: socket, cb:
proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
if not retFuture.finished:
@@ -511,7 +514,7 @@ when defined(windows) or defined(nimdoc):
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
retFuture.fail(newException(EOS, osErrorMsg(err)))
dealloc(ol)
GC_unref(ol)
else:
completeAccept()
# We don't deallocate ``ol`` here because even though this completed

View File

@@ -805,6 +805,7 @@ proc recv*(socket: PSocket, data: pointer, size: int): int {.tags: [FReadIO].} =
let chunk = min(socket.bufLen-socket.currPos, size-read)
var d = cast[cstring](data)
assert size-read >= chunk
copyMem(addr(d[read]), addr(socket.buffer[socket.currPos]), chunk)
read.inc(chunk)
socket.currPos.inc(chunk)
@@ -871,6 +872,7 @@ proc recv*(socket: PSocket, data: pointer, size: int, timeout: int): int {.
while read < size:
let avail = waitFor(socket, waited, timeout, size-read, "recv")
var d = cast[cstring](data)
assert avail <= size-read
result = recv(socket, addr(d[read]), avail)
if result == 0: break
if result < 0:

View File

@@ -639,7 +639,7 @@ proc unmapViewOfFile*(lpBaseAddress: pointer): WINBOOL {.stdcall,
dynlib: "kernel32", importc: "UnmapViewOfFile".}
type
TOVERLAPPED* {.final, pure.} = object
TOVERLAPPED* {.pure, inheritable.} = object
Internal*: DWORD
InternalHigh*: DWORD
Offset*: DWORD
@@ -672,7 +672,7 @@ proc CreateIoCompletionPort*(FileHandle: THANDLE, ExistingCompletionPort: THANDL
proc GetQueuedCompletionStatus*(CompletionPort: THandle,
lpNumberOfBytesTransferred: PDWORD, lpCompletionKey: PULONG,
lpOverlapped: ptr POverlapped,
lpOverlapped: pointer,
dwMilliseconds: DWORD): WINBOOL{.stdcall,
dynlib: "kernel32", importc: "GetQueuedCompletionStatus".}