Files
neovim/test/client/uv_stream.lua
Lewis Russell 9432e6c1e2 test: run Lua harness with nvim -l
Problem:
The Lua test harness still ran through standalone -ll mode, so tests
depended on the low-level Lua path instead of the regular Nvim Lua
environment. That also meant os.exit() coverage had to carry an ASAN
workaround because Lua's raw process exit skipped Nvim teardown and let
LeakSanitizer interfere with the observed exit code.

Solution:
Run the harness and related fixtures with nvim -l. Patch os.exit() in
the main Lua state to exit through getout(), so scripts observe normal
Nvim shutdown while standalone -ll remains available for generator-style
scripts. As a consequence, the startup test can assert os.exit() without
disabling leak detection.

AI-assisted: Codex
2026-05-13 13:14:07 +01:00

302 lines
7.8 KiB
Lua

---
--- Basic stream types.
--- See `rpc_stream.lua` for the msgpack layer.
---
local uv = vim.uv
local function close_handle(handle)
if handle and not handle:is_closing() then
handle:close()
end
end
local function read_stop(handle)
if handle and not handle:is_closing() then
handle:read_stop()
end
end
--- @class test.Stream
--- @field write fun(self, data: string|string[])
--- @field read_start fun(self, cb: fun(chunk: string))
--- @field read_stop fun(self)
--- @field close fun(self, signal?: string, noblock?: boolean)
--- Stream over given pipes.
---
--- @class vim.StdioStream : test.Stream
--- @field private _in uv.uv_pipe_t
--- @field private _out uv.uv_pipe_t
local StdioStream = {}
StdioStream.__index = StdioStream
function StdioStream.open()
local self = setmetatable({
_in = assert(uv.new_pipe(false)),
_out = assert(uv.new_pipe(false)),
}, StdioStream)
self._in:open(0)
self._out:open(1)
return self
end
--- @param data string|string[]
function StdioStream:write(data)
self._out:write(data)
end
function StdioStream:read_start(cb)
self._in:read_start(function(err, chunk)
if err then
error(err)
end
cb(chunk)
end)
end
function StdioStream:read_stop()
read_stop(self._in)
end
function StdioStream:close()
close_handle(self._in)
close_handle(self._out)
end
--- Stream over a named pipe or TCP socket.
---
--- @class test.SocketStream : test.Stream
--- @field package _stream_error? string
--- @field package _socket uv.uv_pipe_t
local SocketStream = {}
SocketStream.__index = SocketStream
function SocketStream.open(file)
local socket = assert(uv.new_pipe(false))
local self = setmetatable({
_socket = socket,
_stream_error = nil,
}, SocketStream)
uv.pipe_connect(socket, file, function(err)
uv.stop()
self._stream_error = self._stream_error or err
end)
-- On Windows, writing to the pipe doesn't work if it's not connected yet,
-- so wait for the connect callback to be called.
uv.run()
if self._stream_error then
close_handle(socket)
error(self._stream_error)
end
return self
end
function SocketStream.connect(host, port)
local socket = assert(uv.new_tcp())
local self = setmetatable({
_socket = socket,
_stream_error = nil,
}, SocketStream)
uv.tcp_connect(socket, host, port, function(err)
self._stream_error = self._stream_error or err
end)
return self
end
function SocketStream:write(data)
if self._stream_error then
error(self._stream_error)
end
uv.write(self._socket, data, function(err)
if err then
self._stream_error = self._stream_error or err
end
end)
end
function SocketStream:read_start(cb)
if self._stream_error then
error(self._stream_error)
end
uv.read_start(self._socket, function(err, chunk)
if err then
self._stream_error = self._stream_error or err
cb(nil) -- Signal EOF so the session layer stops.
return
end
cb(chunk)
end)
end
function SocketStream:read_stop()
if self._stream_error then
error(self._stream_error)
end
read_stop(self._socket)
end
function SocketStream:close()
close_handle(self._socket)
end
--- Stream over child process stdio.
---
--- @class test.ProcStream : test.Stream
--- @field private _proc uv.uv_process_t
--- @field private _pid integer
--- @field private _child_stdin uv.uv_pipe_t
--- @field private _child_stdout uv.uv_pipe_t
--- @field private _child_stderr uv.uv_pipe_t
--- @field package _closed integer
--- @field package _on_exit fun(closed: integer?)
--- Collects stdout (if `collect_text=true`). Treats data as text (CRLF converted to LF).
--- @field stdout string
--- Collects stderr as raw data.
--- @field stderr string
--- Gets stderr+stdout as text (CRLF converted to LF).
--- @field output fun(): string
--- @field stdout_eof boolean
--- @field stderr_eof boolean
--- Collects text into the `stdout` field.
--- @field collect_text boolean
--- Exit code
--- @field status integer
--- @field signal integer
local ProcStream = {}
ProcStream.__index = ProcStream
--- Starts child process specified by `argv`.
---
--- @param argv string[]
--- @param env string[]?
--- @param io_extra uv.uv_pipe_t?
--- @param on_exit fun(closed: integer?)? Called after the child process exits.
--- `closed` is the timestamp (uv.now()) when close() was called, or nil if it wasn't.
--- @param forward_stderr boolean? Forward child process stderr, otherwise collect it.
--- @return test.ProcStream
function ProcStream.spawn(argv, env, io_extra, on_exit, forward_stderr)
local self = setmetatable({
collect_text = false,
output = function(self)
if not self.collect_text then
error('set collect_text=true')
end
return (self.stderr .. self.stdout):gsub('\r\n', '\n')
end,
stdout = '',
stderr = '',
stdout_eof = false,
stderr_eof = false,
_child_stdin = assert(uv.new_pipe(false)),
_child_stdout = assert(uv.new_pipe(false)),
_child_stderr = assert(uv.new_pipe(false)),
_exiting = false,
_on_exit = on_exit,
}, ProcStream)
local prog = argv[1]
local args = {} --- @type string[]
for i = 2, #argv do
args[#args + 1] = argv[i]
end
local stderr = forward_stderr and 1 or self._child_stderr
--- @diagnostic disable-next-line:missing-fields
self._proc, self._pid = uv.spawn(prog, {
stdio = { self._child_stdin, self._child_stdout, stderr, io_extra },
args = args,
--- @diagnostic disable-next-line:assign-type-mismatch
env = env,
}, function(status, signal)
self.signal = signal
-- "Abort" exit may not set status; force to nonzero in that case.
self.status = (0 ~= (status or 0) or 0 == (signal or 0)) and status or (128 + (signal or 0))
close_handle(self._child_stdin)
close_handle(self._proc)
if self._on_exit then
self._on_exit(self._closed)
end
end)
if not self._proc then
local err = self._pid
error(err)
end
return self
end
function ProcStream:write(data)
self._child_stdin:write(data)
end
function ProcStream:on_read(stream, cb, err, chunk)
if err then
error(err) -- stream read failed?
elseif chunk then
-- Always collect stderr, in case it gives useful info on failure.
if stream == 'stderr' then
self.stderr = self.stderr .. chunk --[[@as string]]
elseif stream == 'stdout' and self.collect_text then
-- Set `stdout` and convert CRLF => LF.
self.stdout = (self.stdout .. chunk):gsub('\r\n', '\n')
end
else
-- stderr_eof/stdout_eof
self[stream .. '_eof'] = true ---@type boolean
-- EOF is the stream's lifecycle end even if the caller never closes it.
close_handle(stream == 'stdout' and self._child_stdout or self._child_stderr)
end
-- Handler provided by the caller.
if cb then
cb(chunk)
end
end
--- Collects output until the process exits.
function ProcStream:wait()
while not (self.stdout_eof and self.stderr_eof and (self.status or self.signal)) do
uv.run('once')
end
end
function ProcStream:read_start(on_stdout, on_stderr)
self._child_stdout:read_start(function(err, chunk)
self:on_read('stdout', on_stdout, err, chunk)
end)
self._child_stderr:read_start(function(err, chunk)
self:on_read('stderr', on_stderr, err, chunk)
end)
end
function ProcStream:read_stop()
read_stop(self._child_stdout)
read_stop(self._child_stderr)
end
function ProcStream:close(signal, noblock)
if self._closed then
return
end
self._closed = uv.now()
self:read_stop()
close_handle(self._child_stdin)
close_handle(self._child_stdout)
close_handle(self._child_stderr)
if type(signal) == 'string' and self._proc and not self._proc:is_closing() then
self._proc:kill('sig' .. signal)
end
if not noblock then
while self.status == nil do
uv.run 'once'
end
return self.status, self.signal
end
end
return {
StdioStream = StdioStream,
ProcStream = ProcStream,
SocketStream = SocketStream,
}