mirror of
https://github.com/neovim/neovim.git
synced 2025-09-13 06:48:17 +00:00
job: Refactor to ensure that all callbacks will be invoked
It's possible that a child process won't close it's standard streams, even after it exits. This can be evidenced with the "xclip" program: :call system('xclip -i -selection clipboard', 'DATA') Before this commit, the above command wouldn't return, even though the xclip program had exited. That is because `xclip` wasn't closing it's stdout/stderr streams, which would block pending_refs from ever reaching 0. Now the job.c module was refactored to ensure all streams are closed when the uv_process_t handle is closed.
This commit is contained in:
@@ -21,6 +21,21 @@
|
|||||||
#define MAX_RUNNING_JOBS 100
|
#define MAX_RUNNING_JOBS 100
|
||||||
#define JOB_BUFFER_SIZE 0xFFFF
|
#define JOB_BUFFER_SIZE 0xFFFF
|
||||||
|
|
||||||
|
#define close_job_stream(job, stream, type) \
|
||||||
|
do { \
|
||||||
|
if (job->stream) { \
|
||||||
|
type##stream_free(job->stream); \
|
||||||
|
job->stream = NULL; \
|
||||||
|
if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \
|
||||||
|
uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define close_job_in(job) close_job_stream(job, in, w)
|
||||||
|
#define close_job_out(job) close_job_stream(job, out, r)
|
||||||
|
#define close_job_err(job) close_job_stream(job, err, r)
|
||||||
|
|
||||||
struct job {
|
struct job {
|
||||||
// Job id the index in the job table plus one.
|
// Job id the index in the job table plus one.
|
||||||
int id;
|
int id;
|
||||||
@@ -28,13 +43,9 @@ struct job {
|
|||||||
int64_t status;
|
int64_t status;
|
||||||
// Number of polls after a SIGTERM that will trigger a SIGKILL
|
// Number of polls after a SIGTERM that will trigger a SIGKILL
|
||||||
int exit_timeout;
|
int exit_timeout;
|
||||||
// exit_cb may be called while there's still pending data from stdout/stderr.
|
// Number of references to the job. The job resources will only be freed by
|
||||||
// We use this reference count to ensure the JobExit event is only emitted
|
// close_cb when this is 0
|
||||||
// when stdout/stderr are drained
|
int refcount;
|
||||||
int pending_refs;
|
|
||||||
// Same as above, but for freeing the job memory which contains
|
|
||||||
// libuv handles. Only after all are closed the job can be safely freed.
|
|
||||||
int pending_closes;
|
|
||||||
// If the job was already stopped
|
// If the job was already stopped
|
||||||
bool stopped;
|
bool stopped;
|
||||||
// Data associated with the job
|
// Data associated with the job
|
||||||
@@ -164,8 +175,7 @@ Job *job_start(char **argv,
|
|||||||
job->id = i + 1;
|
job->id = i + 1;
|
||||||
*status = job->id;
|
*status = job->id;
|
||||||
job->status = -1;
|
job->status = -1;
|
||||||
job->pending_refs = 3;
|
job->refcount = 4;
|
||||||
job->pending_closes = 4;
|
|
||||||
job->data = data;
|
job->data = data;
|
||||||
job->stdout_cb = stdout_cb;
|
job->stdout_cb = stdout_cb;
|
||||||
job->stderr_cb = stderr_cb;
|
job->stderr_cb = stderr_cb;
|
||||||
@@ -206,7 +216,9 @@ Job *job_start(char **argv,
|
|||||||
|
|
||||||
// Spawn the job
|
// Spawn the job
|
||||||
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
|
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
|
||||||
free_job(job);
|
close_job_in(job);
|
||||||
|
close_job_out(job);
|
||||||
|
close_job_err(job);
|
||||||
*status = -1;
|
*status = -1;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -272,15 +284,14 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
|
|||||||
int old_mode = cur_tmode;
|
int old_mode = cur_tmode;
|
||||||
settmode(TMODE_COOK);
|
settmode(TMODE_COOK);
|
||||||
|
|
||||||
// Increase pending_refs to stop the exit_cb from being called, which
|
// Increase refcount to stop the job from being freed before we have a
|
||||||
// could result in the job being freed before we have a chance
|
// chance to get the status.
|
||||||
// to get the status.
|
job->refcount++;
|
||||||
job->pending_refs++;
|
|
||||||
event_poll_until(ms,
|
event_poll_until(ms,
|
||||||
// Until...
|
// Until...
|
||||||
got_int || // interrupted by the user
|
got_int || // interrupted by the user
|
||||||
job->pending_refs == 1); // job exited
|
job->refcount == 1); // job exited
|
||||||
job->pending_refs--;
|
job->refcount--;
|
||||||
|
|
||||||
// we'll assume that a user frantically hitting interrupt doesn't like
|
// we'll assume that a user frantically hitting interrupt doesn't like
|
||||||
// the current job. Signal that it has to be killed.
|
// the current job. Signal that it has to be killed.
|
||||||
@@ -291,9 +302,10 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
|
|||||||
|
|
||||||
settmode(old_mode);
|
settmode(old_mode);
|
||||||
|
|
||||||
if (!job->pending_refs) {
|
if (!job->refcount) {
|
||||||
int status = (int) job->status;
|
int status = (int) job->status;
|
||||||
job_exit_callback(job);
|
// Manually invoke close_cb to free the job resources
|
||||||
|
close_cb((uv_handle_t *)&job->proc);
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -312,15 +324,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
|
|||||||
/// @param job The job instance
|
/// @param job The job instance
|
||||||
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
|
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
|
||||||
{
|
{
|
||||||
if (!job->in) {
|
close_job_in(job);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// let other functions in the job module know that the in pipe is no more
|
|
||||||
wstream_free(job->in);
|
|
||||||
job->in = NULL;
|
|
||||||
|
|
||||||
uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// All writes that complete after calling this function will be reported
|
/// All writes that complete after calling this function will be reported
|
||||||
@@ -379,9 +383,6 @@ static void job_exit_callback(Job *job)
|
|||||||
job->exit_cb(job, job->data);
|
job->exit_cb(job, job->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Free the job resources
|
|
||||||
free_job(job);
|
|
||||||
|
|
||||||
// Stop polling job status if this was the last
|
// Stop polling job status if this was the last
|
||||||
job_count--;
|
job_count--;
|
||||||
if (job_count == 0) {
|
if (job_count == 0) {
|
||||||
@@ -394,16 +395,6 @@ static bool is_alive(Job *job)
|
|||||||
return uv_process_kill(&job->proc, 0) == 0;
|
return uv_process_kill(&job->proc, 0) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void free_job(Job *job)
|
|
||||||
{
|
|
||||||
uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
|
|
||||||
if (job->in) {
|
|
||||||
uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
|
|
||||||
}
|
|
||||||
uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
|
|
||||||
uv_close((uv_handle_t *)&job->proc, close_cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
|
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
|
||||||
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
|
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
|
||||||
static void job_prepare_cb(uv_prepare_t *handle)
|
static void job_prepare_cb(uv_prepare_t *handle)
|
||||||
@@ -433,12 +424,14 @@ static void read_cb(RStream *rstream, void *data, bool eof)
|
|||||||
|
|
||||||
if (rstream == job->out) {
|
if (rstream == job->out) {
|
||||||
job->stdout_cb(rstream, data, eof);
|
job->stdout_cb(rstream, data, eof);
|
||||||
|
if (eof) {
|
||||||
|
close_job_out(job);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
job->stderr_cb(rstream, data, eof);
|
job->stderr_cb(rstream, data, eof);
|
||||||
}
|
if (eof) {
|
||||||
|
close_job_err(job);
|
||||||
if (eof && --job->pending_refs == 0) {
|
}
|
||||||
job_exit_callback(job);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -448,31 +441,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
|
|||||||
Job *job = handle_get_job((uv_handle_t *)proc);
|
Job *job = handle_get_job((uv_handle_t *)proc);
|
||||||
|
|
||||||
job->status = status;
|
job->status = status;
|
||||||
if (--job->pending_refs == 0) {
|
uv_close((uv_handle_t *)&job->proc, close_cb);
|
||||||
job_exit_callback(job);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void close_cb(uv_handle_t *handle)
|
static void close_cb(uv_handle_t *handle)
|
||||||
{
|
{
|
||||||
Job *job = handle_get_job(handle);
|
Job *job = handle_get_job(handle);
|
||||||
|
|
||||||
if (--job->pending_closes == 0) {
|
if (handle == (uv_handle_t *)&job->proc) {
|
||||||
// Only free the job memory after all the associated handles are properly
|
// Make sure all streams are properly closed to trigger callback invocation
|
||||||
// closed by libuv
|
// when job->proc is closed
|
||||||
rstream_free(job->out);
|
close_job_in(job);
|
||||||
rstream_free(job->err);
|
close_job_out(job);
|
||||||
if (job->in) {
|
close_job_err(job);
|
||||||
wstream_free(job->in);
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Free data memory of process and pipe handles, that was allocated
|
if (--job->refcount == 0) {
|
||||||
// by handle_set_job in job_start.
|
// Invoke the exit_cb
|
||||||
|
job_exit_callback(job);
|
||||||
|
// Free all memory allocated for the job
|
||||||
free(job->proc.data);
|
free(job->proc.data);
|
||||||
free(job->proc_stdin.data);
|
free(job->proc_stdin.data);
|
||||||
free(job->proc_stdout.data);
|
free(job->proc_stdout.data);
|
||||||
free(job->proc_stderr.data);
|
free(job->proc_stderr.data);
|
||||||
|
|
||||||
shell_free_argv(job->proc_opts.args);
|
shell_free_argv(job->proc_opts.args);
|
||||||
free(job);
|
free(job);
|
||||||
}
|
}
|
||||||
|
@@ -19,6 +19,15 @@ local function delete_file(name)
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- Some tests require the xclip program and a x server.
|
||||||
|
local xclip = nil
|
||||||
|
do
|
||||||
|
if os.getenv('DISPLAY') then
|
||||||
|
local proc = io.popen('which xclip')
|
||||||
|
xclip = proc:read()
|
||||||
|
proc:close()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe('system()', function()
|
describe('system()', function()
|
||||||
before_each(clear)
|
before_each(clear)
|
||||||
@@ -85,6 +94,15 @@ describe('system()', function()
|
|||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
if xclip then
|
||||||
|
describe("with a program that doesn't close stdout", function()
|
||||||
|
it('will exit properly after passing input', function()
|
||||||
|
eq(nil, eval([[system('xclip -i -selection clipboard', 'clip-data')]]))
|
||||||
|
eq('clip-data', eval([[system('xclip -o -selection clipboard')]]))
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
describe('systemlist()', function()
|
describe('systemlist()', function()
|
||||||
@@ -140,4 +158,15 @@ describe('systemlist()', function()
|
|||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
if xclip then
|
||||||
|
describe("with a program that doesn't close stdout", function()
|
||||||
|
it('will exit properly after passing input', function()
|
||||||
|
eq(nil, eval(
|
||||||
|
"systemlist('xclip -i -selection clipboard', ['clip', 'data'])"))
|
||||||
|
eq({'clip', 'data'}, eval(
|
||||||
|
"systemlist('xclip -o -selection clipboard')"))
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end
|
||||||
end)
|
end)
|
||||||
|
Reference in New Issue
Block a user