* Fix #14906 by wrapping outputStream with PipeOutStream

* Fix compile error when ./build_all.sh

* Use PipeOutStream on posix

* Fix compile error when build_all.sh

* Use ptr UncheckedArray

* Replace copyRefObj

* Remove tmp buffer from posPeekData

* Add more tests for outputStream

* Add comments about PipeOutStream.buffer

* Fix bug in posReadLine

* Move implementation of newPipeOutStream to streamwrapper module
This commit is contained in:
Tomohiro
2020-07-18 17:41:33 +09:00
committed by GitHub
parent edbbbdf1a8
commit c983466c15
3 changed files with 235 additions and 9 deletions

View File

@@ -18,7 +18,8 @@
include "system/inclrtl"
import
strutils, os, strtabs, streams, cpuinfo
strutils, os, strtabs, streams, cpuinfo, streamwrapper,
std/private/since
export quoteShell, quoteShellWindows, quoteShellPosix
@@ -237,6 +238,10 @@ proc inputStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [].}
proc outputStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [].}
## Returns ``p``'s output stream for reading from.
##
## You cannot perform peek/write/setOption operations to this stream.
## Use `peekableOutputStream proc <#peekableOutputStream,Process>`_
## if you need to peek stream.
##
## **WARNING**: The returned `Stream` should not be closed manually as it
## is closed when closing the Process ``p``.
##
@@ -247,6 +252,10 @@ proc outputStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [].}
proc errorStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [].}
## Returns ``p``'s error stream for reading from.
##
## You cannot perform peek/write/setOption operations to this stream.
## Use `peekableErrorStream proc <#peekableErrorStream,Process>`_
## if you need to peek stream.
##
## **WARNING**: The returned `Stream` should not be closed manually as it
## is closed when closing the Process ``p``.
##
@@ -254,6 +263,30 @@ proc errorStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [].}
## * `inputStream proc <#inputStream,Process>`_
## * `outputStream proc <#outputStream,Process>`_
proc peekableOutputStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [], since: (1, 3).}
## Returns ``p``'s output stream for reading from.
##
## You can peek returned stream.
##
## **WARNING**: The returned `Stream` should not be closed manually as it
## is closed when closing the Process ``p``.
##
## See also:
## * `outputStream proc <#outputStream,Process>`_
## * `peekableErrorStream proc <#peekableErrorStream,Process>`_
proc peekableErrorStream*(p: Process): Stream {.rtl, extern: "nosp$1", tags: [], since: (1, 3).}
## Returns ``p``'s error stream for reading from.
##
## You can run peek operation to returned stream.
##
## **WARNING**: The returned `Stream` should not be closed manually as it
## is closed when closing the Process ``p``.
##
## See also:
## * `errorStream proc <#errorStream,Process>`_
## * `peekableOutputStream proc <#peekableOutputStream,Process>`_
proc inputHandle*(p: Process): FileHandle {.rtl, extern: "nosp$1",
tags: [].} =
## Returns ``p``'s input file handle for writing to.
@@ -737,6 +770,18 @@ when defined(Windows) and not defined(useNimRtl):
p.errStream = newFileHandleStream(p.errHandle)
result = p.errStream
proc peekableOutputStream(p: Process): Stream =
streamAccess(p)
if p.outStream == nil:
p.outStream = newFileHandleStream(p.outHandle).newPipeOutStream
result = p.outStream
proc peekableErrorStream(p: Process): Stream =
streamAccess(p)
if p.errStream == nil:
p.errStream = newFileHandleStream(p.errHandle).newPipeOutStream
result = p.errStream
proc execCmd(command: string): int =
var
si: STARTUPINFO
@@ -1360,28 +1405,40 @@ elif not defined(useNimRtl):
p.exitStatus = status
result = exitStatusLikeShell(status)
proc createStream(stream: var owned(Stream), handle: var FileHandle,
fileMode: FileMode) =
proc createStream(handle: var FileHandle,
fileMode: FileMode): owned FileStream =
var f: File
if not open(f, handle, fileMode): raiseOSError(osLastError())
stream = newFileStream(f)
return newFileStream(f)
proc inputStream(p: Process): Stream =
streamAccess(p)
if p.inStream == nil:
createStream(p.inStream, p.inHandle, fmWrite)
p.inStream = createStream(p.inHandle, fmWrite)
return p.inStream
proc outputStream(p: Process): Stream =
streamAccess(p)
if p.outStream == nil:
createStream(p.outStream, p.outHandle, fmRead)
p.outStream = createStream(p.outHandle, fmRead)
return p.outStream
proc errorStream(p: Process): Stream =
streamAccess(p)
if p.errStream == nil:
createStream(p.errStream, p.errHandle, fmRead)
p.errStream = createStream(p.errHandle, fmRead)
return p.errStream
proc peekableOutputStream(p: Process): Stream =
streamAccess(p)
if p.outStream == nil:
p.outStream = createStream(p.outHandle, fmRead).newPipeOutStream
return p.outStream
proc peekableErrorStream(p: Process): Stream =
streamAccess(p)
if p.errStream == nil:
p.errStream = createStream(p.errHandle, fmRead).newPipeOutStream
return p.errStream
proc csystem(cmd: cstring): cint {.nodecl, importc: "system",

117
lib/pure/streamwrapper.nim Normal file
View File

@@ -0,0 +1,117 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2020 Andreas Rumpf
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements stream wrapper.
##
## **Since** version 1.2.
import deques, streams
type
PipeOutStream*[T] = ref object of T
# When stream peek operation is called, it reads from base stream
# type using `baseReadDataImpl` and stores the content to this buffer.
# Next stream read operation returns data in the buffer so that previus peek
# operation looks like didn't changed read positon.
# When stream read operation that returns N byte data is called and the size is smaller than buffer size,
# first N elements are removed from buffer.
# Deque type can do such operation more efficiently than seq type.
buffer: Deque[char]
baseReadLineImpl: typeof(StreamObj.readLineImpl)
baseReadDataImpl: typeof(StreamObj.readDataImpl)
proc posReadLine[T](s: Stream, line: var TaintedString): bool =
var s = PipeOutStream[T](s)
assert s.baseReadLineImpl != nil
let n = s.buffer.len
line.string.setLen(0)
for i in 0..<n:
var c = s.buffer.popFirst
if c == '\c':
c = readChar(s)
return true
elif c == '\L': return true
elif c == '\0':
return line.len > 0
line.string.add(c)
var line2: string
result = s.baseReadLineImpl(s, line2)
line.add line2
proc posReadData[T](s: Stream, buffer: pointer, bufLen: int): int =
var s = PipeOutStream[T](s)
assert s.baseReadDataImpl != nil
let
dest = cast[ptr UncheckedArray[char]](buffer)
n = min(s.buffer.len, bufLen)
result = n
for i in 0..<n:
dest[i] = s.buffer.popFirst
if bufLen > n:
result += s.baseReadDataImpl(s, addr dest[n], bufLen - n)
proc posReadDataStr[T](s: Stream, buffer: var string, slice: Slice[int]): int =
posReadData[T](s, addr buffer[slice.a], slice.len)
proc posPeekData[T](s: Stream, buffer: pointer, bufLen: int): int =
var s = PipeOutStream[T](s)
assert s.baseReadDataImpl != nil
let
dest = cast[ptr UncheckedArray[char]](buffer)
n = min(s.buffer.len, bufLen)
result = n
for i in 0..<n:
dest[i] = s.buffer[i]
if bufLen > n:
let
newDataNeeded = bufLen - n
numRead = s.baseReadDataImpl(s, addr dest[n], newDataNeeded)
result += numRead
for i in 0..<numRead:
s.buffer.addLast dest[n + i]
proc newPipeOutStream*[T](s: sink (ref T)): owned PipeOutStream[T] =
## Wrap pipe for reading with PipeOutStream so that you can use peek* procs and generate runtime error
## when setPosition/getPosition is called or write operation is performed.
##
## Example:
##
## .. code-block:: Nim
## import osproc, streamwrapper
## var
## p = startProcess(exePath)
## outStream = p.outputStream().newPipeOutStream()
## echo outStream.peekChar
## p.close()
assert s.readDataImpl != nil
new(result)
for dest, src in fields((ref T)(result)[], s[]):
dest = src
wasMoved(s[])
if result.readLineImpl != nil:
result.baseReadLineImpl = result.readLineImpl
result.readLineImpl = posReadLine[T]
result.baseReadDataImpl = result.readDataImpl
result.readDataImpl = posReadData[T]
result.readDataStrImpl = posReadDataStr[T]
result.peekDataImpl = posPeekData[T]
# Set nil to anything you may not call.
result.setPositionImpl = nil
result.getPositionImpl = nil
result.writeDataImpl = nil
result.flushImpl = nil

View File

@@ -115,6 +115,8 @@ else: # main driver
runTest("c_exit2_139", 139)
runTest("quit_139", 139)
import std/streams
block execProcessTest:
let dir = sourcePath.parentDir
let (_, err) = execCmdEx(nim & " c " & quoteShell(dir / "osproctest.nim"))
@@ -132,13 +134,63 @@ else: # main driver
doAssert outStr2 == absolutePath(testDir) & "\nx yz\n"
removeDir(testDir)
# test for PipeOutStream
var
p = startProcess(exePath, args = ["abcdefghi", "foo", "bar", "0123456"])
outStrm = p.peekableOutputStream
var tmp: string
doAssert outStrm.readLine(tmp)
doAssert outStrm.readChar == 'a'
doAssert outStrm.peekChar == 'b'
doAssert outStrm.readChar == 'b'
doAssert outStrm.readChar == 'c'
doAssert outStrm.peekChar == 'd'
doAssert outStrm.peekChar == 'd'
doAssert outStrm.readChar == 'd'
doAssert outStrm.readStr(2) == "ef"
doAssert outStrm.peekStr(2) == "gh"
doAssert outStrm.peekStr(2) == "gh"
doAssert outStrm.readStr(1) == "g"
doAssert outStrm.readStr(3) == "hi\n"
doAssert outStrm.readLine == "foo"
doAssert outStrm.readChar == 'b'
doAssert outStrm.peekChar == 'a'
doAssert outStrm.readLine == "ar"
tmp.setLen(4)
tmp[0] = 'n'
doAssert outStrm.readDataStr(tmp, 1..3) == 3
doAssert tmp == "n012"
doAssert outStrm.peekStr(3) == "345"
doAssert outStrm.readDataStr(tmp, 1..2) == 2
doAssert tmp == "n342"
doAssert outStrm.peekStr(2) == "56"
doAssert outStrm.readDataStr(tmp, 0..3) == 3
doAssert tmp == "56\n2"
p.close
p = startProcess(exePath, args = ["123"])
outStrm = p.peekableOutputStream
let c = outStrm.peekChar
doAssert outStrm.readLine(tmp)
doAssert tmp[0] == c
tmp.setLen(7)
doAssert outStrm.peekData(addr tmp[0], 7) == 4
doAssert tmp[0..3] == "123\n"
doAssert outStrm.peekData(addr tmp[0], 7) == 4
doAssert tmp[0..3] == "123\n"
doAssert outStrm.readData(addr tmp[0], 7) == 4
doAssert tmp[0..3] == "123\n"
p.close
try:
removeFile(exePath)
except OSError:
discard
import std/streams
block: # test for startProcess (more tests needed)
# bugfix: windows stdin.close was a noop and led to blocking reads
proc startProcessTest(command: string, options: set[ProcessOption] = {