Add async IO operations with buffers on files and sockets

This commit is contained in:
Anatoly Galiulin
2016-08-31 11:18:36 +07:00
parent 41f6c08f92
commit e4c46e6fba
4 changed files with 373 additions and 23 deletions

View File

@@ -749,7 +749,7 @@ when defined(windows) or defined(nimdoc):
retFuture.complete("")
return retFuture
proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
## at least be of that size. Returned future will complete once all the
@@ -769,11 +769,11 @@ when defined(windows) or defined(nimdoc):
verifyPresence(socket)
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
var retFuture = newFuture[int]("recvInto")
var retFuture = newFuture[int]("recvBuffer")
#buf[] = '\0'
var dataBuf: TWSABuf
dataBuf.buf = buf
dataBuf.buf = cast[cstring](buf)
dataBuf.len = size.ULONG
var bytesReceived: Dword
@@ -784,10 +784,7 @@ when defined(windows) or defined(nimdoc):
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete(0)
else:
retFuture.complete(bytesCount)
retFuture.complete(bytesCount)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete(0)
@@ -819,6 +816,51 @@ when defined(windows) or defined(nimdoc):
retFuture.complete(bytesReceived)
return retFuture
proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
## data has been sent.
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
## to avoid early freeing of the buffer
verifyPresence(socket)
var retFuture = newFuture[void]("send")
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](buf)
dataBuf.len = size.ULONG
var bytesReceived, lowFlags: Dword
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
if flags.isDisconnectionError(errcode):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
lowFlags, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``data`` to ``socket``. The returned future will complete once all
@@ -828,9 +870,9 @@ when defined(windows) or defined(nimdoc):
var dataBuf: TWSABuf
dataBuf.buf = data
dataBuf.len = data.len.ULONG
GC_ref(data) # we need to protect data until send operation is completed
# or failed.
dataBuf.len = data.len.ULONG
var bytesReceived, lowFlags: Dword
var ol = PCustomOverlapped()
@@ -1403,9 +1445,9 @@ else:
addRead(socket, cb)
return retFuture
proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
var retFuture = newFuture[int]("recvInto")
var retFuture = newFuture[int]("recvBuffer")
proc cb(sock: AsyncFD): bool =
result = true
@@ -1427,6 +1469,38 @@ else:
addRead(socket, cb)
return retFuture
proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[void] =
var retFuture = newFuture[void]("send")
var written = 0
proc cb(sock: AsyncFD): bool =
result = true
let netSize = size-written
var d = cast[cstring](buf)
let res = send(sock.SocketHandle, addr d[written], netSize.cint,
MSG_NOSIGNAL)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
if flags.isDisconnectionError(lastError):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
written.inc(res)
if res != netSize:
result = false # We still have data to send.
else:
retFuture.complete()
# TODO: The following causes crashes.
#if not cb(socket):
addWrite(socket, cb)
return retFuture
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): Future[void] =
var retFuture = newFuture[void]("send")

View File

@@ -112,6 +112,80 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile =
register(result.fd)
proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
## Read ``size`` bytes from the specified file asynchronously starting at
## the current position of the file pointer.
##
## If the file pointer is past the end of the file then an empty string is
## returned.
var retFuture = newFuture[int]("asyncfile.readBuffer")
when defined(windows) or defined(nimdoc):
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: f.fd, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
assert bytesCount > 0
assert bytesCount <= size
f.offset.inc bytesCount
retFuture.complete(bytesCount)
else:
if errcode.int32 == ERROR_HANDLE_EOF:
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
ol.offset = DWord(f.offset and 0xffffffff)
ol.offsetHigh = DWord(f.offset shr 32)
# According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
let ret = readFile(f.fd.Handle, buf, size.int32, nil,
cast[POVERLAPPED](ol))
if not ret.bool:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
# Request completed immediately.
var bytesRead: DWord
let overlappedRes = getOverlappedResult(f.fd.Handle,
cast[POverlapped](ol), bytesRead, false.WinBool)
if not overlappedRes.bool:
let err = osLastError()
if err.int32 == ERROR_HANDLE_EOF:
retFuture.complete(0)
else:
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
else:
assert bytesRead > 0
assert bytesRead <= size
f.offset.inc bytesRead
retFuture.complete(bytesRead)
else:
proc cb(fd: AsyncFD): bool =
result = true
let res = read(fd.cint, cast[cstring](buf), size.cint)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EAGAIN:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
elif res == 0:
# EOF
retFuture.complete(0)
else:
f.offset.inc(res)
retFuture.complete(res)
if not cb(f.fd):
addRead(f.fd, cb)
return retFuture
proc read*(f: AsyncFile, size: int): Future[string] =
## Read ``size`` bytes from the specified file asynchronously starting at
## the current position of the file pointer.
@@ -238,6 +312,73 @@ proc readAll*(f: AsyncFile): Future[string] {.async.} =
return
result.add data
proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
## Writes ``size`` bytes from ``buf`` to the file specified asynchronously.
##
## The returned Future will complete once all data has been written to the
## specified file.
var retFuture = newFuture[void]("asyncfile.writeBuffer")
when defined(windows) or defined(nimdoc):
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: f.fd, cb:
proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
assert bytesCount == size.int32
f.offset.inc(size)
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
ol.offset = DWord(f.offset and 0xffffffff)
ol.offsetHigh = DWord(f.offset shr 32)
# According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
cast[POVERLAPPED](ol))
if not ret.bool:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
# Request completed immediately.
var bytesWritten: DWord
let overlappedRes = getOverlappedResult(f.fd.Handle,
cast[POverlapped](ol), bytesWritten, false.WinBool)
if not overlappedRes.bool:
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
else:
assert bytesWritten == size.int32
f.offset.inc(size)
retFuture.complete()
else:
var written = 0
proc cb(fd: AsyncFD): bool =
result = true
let remainderSize = size-written
var cbuf = cast[cstring](buf)
let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
if res < 0:
let lastError = osLastError()
if lastError.int32 != EAGAIN:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
written.inc res
f.offset.inc res
if res != remainderSize:
result = false # We still have data to write.
else:
retFuture.complete()
if not cb(f.fd):
addWrite(f.fd, cb)
return retFuture
proc write*(f: AsyncFile, data: string): Future[void] =
## Writes ``data`` to the file specified asynchronously.
##

View File

@@ -193,7 +193,7 @@ proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
sslSetConnectState(socket.sslHandle)
sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
template readInto(buf: cstring, size: int, socket: AsyncSocket,
template readInto(buf: pointer, size: int, socket: AsyncSocket,
flags: set[SocketFlag]): int =
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
## this is a template and not a proc.
@@ -202,10 +202,10 @@ template readInto(buf: cstring, size: int, socket: AsyncSocket,
when defineSsl:
# SSL mode.
sslLoop(socket, flags,
sslRead(socket.sslHandle, buf, size.cint))
sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
res = opResult
else:
var recvIntoFut = recvInto(socket.fd.AsyncFD, buf, size, flags)
var recvIntoFut = recvBuffer(socket.fd.AsyncFD, buf, size, flags)
yield recvIntoFut
# Not in SSL mode.
res = recvIntoFut.read()
@@ -218,6 +218,54 @@ template readIntoBuf(socket: AsyncSocket,
socket.bufLen = size
size
proc recvBuffer*(socket: AsyncSocket, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] {.async.} =
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``.
##
## For buffered sockets this function will attempt to read all the requested
## data. It will read this data in ``BufferSize`` chunks.
##
## For unbuffered sockets this function makes no effort to read
## all the data requested. It will return as much data as the operating system
## gives it.
##
## If socket is disconnected during the
## recv operation then the future may complete with only a part of the
## requested data.
##
## If socket is disconnected and no data is available
## to be read then the future will complete with a value of ``0``.
if socket.isBuffered:
let originalBufPos = socket.currPos
if socket.bufLen == 0:
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
if res == 0:
return 0
var read = 0
var cbuf = cast[cstring](buf)
while read < size:
if socket.currPos >= socket.bufLen:
if SocketFlag.Peek in flags:
# We don't want to get another buffer if we're peeking.
break
let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
if res == 0:
break
let chunk = min(socket.bufLen-socket.currPos, size-read)
copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
read.inc(chunk)
socket.currPos.inc(chunk)
if SocketFlag.Peek in flags:
# Restore old buffer cursor position.
socket.currPos = originalBufPos
result = read
else:
result = readInto(buf, size, socket, flags)
proc recv*(socket: AsyncSocket, size: int,
flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
## Reads **up to** ``size`` bytes from ``socket``.
@@ -270,6 +318,19 @@ proc recv*(socket: AsyncSocket, size: int,
let read = readInto(addr result[0], size, socket, flags)
result.setLen(read)
proc sendBuffer*(socket: AsyncSocket, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}) {.async.} =
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
## data has been sent.
assert socket != nil
if socket.isSsl:
when defineSsl:
sslLoop(socket, flags,
sslWrite(socket.sslHandle, cast[pointer](buf), size.cint))
await sendPendingSslData(socket, flags)
else:
await sendBuffer(socket.fd.AsyncFD, buf, size, flags)
proc send*(socket: AsyncSocket, data: string,
flags = {SocketFlag.SafeDisconn}) {.async.} =
## Sends ``data`` to ``socket``. The returned future will complete once all

View File

@@ -718,7 +718,7 @@ when defined(windows) or defined(nimdoc):
retFuture.complete("")
return retFuture
proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
## at least be of that size. Returned future will complete once all the
@@ -738,11 +738,11 @@ when defined(windows) or defined(nimdoc):
verifyPresence(socket)
assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
var retFuture = newFuture[int]("recvInto")
var retFuture = newFuture[int]("recvBuffer")
#buf[] = '\0'
var dataBuf: TWSABuf
dataBuf.buf = buf
dataBuf.buf = cast[cstring](buf)
dataBuf.len = size.ULONG
var bytesReceived: Dword
@@ -753,10 +753,7 @@ when defined(windows) or defined(nimdoc):
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
if bytesCount == 0 and dataBuf.buf[0] == '\0':
retFuture.complete(0)
else:
retFuture.complete(bytesCount)
retFuture.complete(bytesCount)
else:
if flags.isDisconnectionError(errcode):
retFuture.complete(0)
@@ -788,6 +785,51 @@ when defined(windows) or defined(nimdoc):
retFuture.complete(bytesReceived)
return retFuture
proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
## data has been sent.
## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
## to avoid early freeing of the buffer
verifyPresence(socket)
var retFuture = newFuture[void]("send")
var dataBuf: TWSABuf
dataBuf.buf = cast[cstring](buf)
dataBuf.len = size.ULONG
var bytesReceived, lowFlags: Dword
var ol = PCustomOverlapped()
GC_ref(ol)
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
if not retFuture.finished:
if errcode == OSErrorCode(-1):
retFuture.complete()
else:
if flags.isDisconnectionError(errcode):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(errcode)))
)
let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
lowFlags, cast[POVERLAPPED](ol), nil)
if ret == -1:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
GC_unref(ol)
if flags.isDisconnectionError(err):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
else:
retFuture.complete()
# We don't deallocate ``ol`` here because even though this completed
# immediately poll will still be notified about its completion and it will
# free ``ol``.
return retFuture
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): Future[void] =
## Sends ``data`` to ``socket``. The returned future will complete once all
@@ -797,9 +839,9 @@ when defined(windows) or defined(nimdoc):
var dataBuf: TWSABuf
dataBuf.buf = data
dataBuf.len = data.len.ULONG
GC_ref(data) # we need to protect data until send operation is completed
# or failed.
dataBuf.len = data.len.ULONG
var bytesReceived, lowFlags: Dword
var ol = PCustomOverlapped()
@@ -1528,9 +1570,9 @@ else:
addRead(socket, cb)
return retFuture
proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[int] =
var retFuture = newFuture[int]("recvInto")
var retFuture = newFuture[int]("recvBuffer")
proc cb(sock: AsyncFD): bool =
result = true
@@ -1552,6 +1594,38 @@ else:
addRead(socket, cb)
return retFuture
proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
flags = {SocketFlag.SafeDisconn}): Future[void] =
var retFuture = newFuture[void]("send")
var written = 0
proc cb(sock: AsyncFD): bool =
result = true
let netSize = size-written
var d = cast[cstring](buf)
let res = send(sock.SocketHandle, addr d[written], netSize.cint,
MSG_NOSIGNAL)
if res < 0:
let lastError = osLastError()
if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
if flags.isDisconnectionError(lastError):
retFuture.complete()
else:
retFuture.fail(newException(OSError, osErrorMsg(lastError)))
else:
result = false # We still want this callback to be called.
else:
written.inc(res)
if res != netSize:
result = false # We still have data to send.
else:
retFuture.complete()
# TODO: The following causes crashes.
#if not cb(socket):
addWrite(socket, cb)
return retFuture
proc send*(socket: AsyncFD, data: string,
flags = {SocketFlag.SafeDisconn}): Future[void] =
var retFuture = newFuture[void]("send")