mirror of
https://github.com/neovim/neovim.git
synced 2025-10-15 22:36:09 +00:00

- One can now manually close the in-pipe, without having to tear down the job. - One can be notified of write success/failure.
521 lines
14 KiB
C
521 lines
14 KiB
C
#include <stdint.h>
|
|
#include <stdbool.h>
|
|
|
|
#include <uv.h>
|
|
|
|
#include "nvim/os/uv_helpers.h"
|
|
#include "nvim/os/job.h"
|
|
#include "nvim/os/job_defs.h"
|
|
#include "nvim/os/rstream.h"
|
|
#include "nvim/os/rstream_defs.h"
|
|
#include "nvim/os/wstream.h"
|
|
#include "nvim/os/wstream_defs.h"
|
|
#include "nvim/os/event.h"
|
|
#include "nvim/os/event_defs.h"
|
|
#include "nvim/os/time.h"
|
|
#include "nvim/os/shell.h"
|
|
#include "nvim/os/signal.h"
|
|
#include "nvim/vim.h"
|
|
#include "nvim/memory.h"
|
|
#include "nvim/term.h"
|
|
|
|
#define EXIT_TIMEOUT 25
|
|
#define MAX_RUNNING_JOBS 100
|
|
#define JOB_BUFFER_SIZE 1024
|
|
|
|
struct job {
|
|
// Job id the index in the job table plus one.
|
|
int id;
|
|
// Exit status code of the job process
|
|
int64_t status;
|
|
// Number of polls after a SIGTERM that will trigger a SIGKILL
|
|
int exit_timeout;
|
|
// exit_cb may be called while there's still pending data from stdout/stderr.
|
|
// We use this reference count to ensure the JobExit event is only emitted
|
|
// when stdout/stderr are drained
|
|
int pending_refs;
|
|
// Same as above, but for freeing the job memory which contains
|
|
// libuv handles. Only after all are closed the job can be safely freed.
|
|
int pending_closes;
|
|
// If the job was already stopped
|
|
bool stopped;
|
|
bool defer;
|
|
// Data associated with the job
|
|
void *data;
|
|
// Callbacks
|
|
job_exit_cb exit_cb;
|
|
rstream_cb stdout_cb, stderr_cb;
|
|
// Readable streams(std{out,err})
|
|
RStream *out, *err;
|
|
// Writable stream(stdin)
|
|
WStream *in;
|
|
// Structures for process spawning/management used by libuv
|
|
uv_process_t proc;
|
|
uv_process_options_t proc_opts;
|
|
uv_stdio_container_t stdio[3];
|
|
uv_pipe_t proc_stdin, proc_stdout, proc_stderr;
|
|
};
|
|
|
|
static Job *table[MAX_RUNNING_JOBS] = {NULL};
|
|
static uint32_t job_count = 0;
|
|
static uv_prepare_t job_prepare;
|
|
|
|
// Some helpers shared in this module
|
|
|
|
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
|
# include "os/job.c.generated.h"
|
|
#endif
|
|
// Callbacks for libuv
|
|
|
|
/// Initializes job control resources
|
|
void job_init(void)
|
|
{
|
|
uv_disable_stdio_inheritance();
|
|
uv_prepare_init(uv_default_loop(), &job_prepare);
|
|
}
|
|
|
|
/// Releases job control resources and terminates running jobs
|
|
void job_teardown(void)
|
|
{
|
|
// 20 tries will give processes about 1 sec to exit cleanly
|
|
uint32_t remaining_tries = 20;
|
|
bool all_dead = true;
|
|
int i;
|
|
Job *job;
|
|
|
|
// Politely ask each job to terminate
|
|
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
|
|
if ((job = table[i]) != NULL) {
|
|
all_dead = false;
|
|
uv_process_kill(&job->proc, SIGTERM);
|
|
}
|
|
}
|
|
|
|
if (all_dead) {
|
|
return;
|
|
}
|
|
|
|
os_delay(10, 0);
|
|
// Right now any exited process are zombies waiting for us to acknowledge
|
|
// their status with `wait` or handling SIGCHLD. libuv does that
|
|
// automatically (and then calls `exit_cb`) but we have to give it a chance
|
|
// by running the loop one more time
|
|
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
|
|
|
|
// Prepare to start shooting
|
|
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
|
|
if ((job = table[i]) == NULL) {
|
|
continue;
|
|
}
|
|
|
|
// Still alive
|
|
while (is_alive(job) && remaining_tries--) {
|
|
os_delay(50, 0);
|
|
// Acknowledge child exits
|
|
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
|
|
}
|
|
|
|
if (is_alive(job)) {
|
|
uv_process_kill(&job->proc, SIGKILL);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Tries to start a new job.
|
|
///
|
|
/// @param argv Argument vector for the process. The first item is the
|
|
/// executable to run.
|
|
/// @param data Caller data that will be associated with the job
|
|
/// @param stdout_cb Callback that will be invoked when data is available
|
|
/// on stdout
|
|
/// @param stderr_cb Callback that will be invoked when data is available
|
|
/// on stderr
|
|
/// @param exit_cb Callback that will be invoked when the job exits
|
|
/// @param defer If the job callbacks invocation should be deferred to vim
|
|
/// main loop
|
|
/// @param maxmem Maximum amount of memory used by the job WStream
|
|
/// @param[out] The job id if the job started successfully, 0 if the job table
|
|
/// is full, -1 if the program could not be executed.
|
|
/// @return The job pointer if the job started successfully, NULL otherwise
|
|
Job *job_start(char **argv,
|
|
void *data,
|
|
rstream_cb stdout_cb,
|
|
rstream_cb stderr_cb,
|
|
job_exit_cb job_exit_cb,
|
|
bool defer,
|
|
size_t maxmem,
|
|
int *status)
|
|
{
|
|
int i;
|
|
Job *job;
|
|
|
|
// Search for a free slot in the table
|
|
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
|
|
if (table[i] == NULL) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (i == MAX_RUNNING_JOBS) {
|
|
// No free slots
|
|
*status = 0;
|
|
return NULL;
|
|
}
|
|
|
|
job = xmalloc(sizeof(Job));
|
|
// Initialize
|
|
job->id = i + 1;
|
|
*status = job->id;
|
|
job->status = -1;
|
|
job->pending_refs = 3;
|
|
job->pending_closes = 4;
|
|
job->data = data;
|
|
job->stdout_cb = stdout_cb;
|
|
job->stderr_cb = stderr_cb;
|
|
job->exit_cb = job_exit_cb;
|
|
job->stopped = false;
|
|
job->exit_timeout = EXIT_TIMEOUT;
|
|
job->proc_opts.file = argv[0];
|
|
job->proc_opts.args = argv;
|
|
job->proc_opts.stdio = job->stdio;
|
|
job->proc_opts.stdio_count = 3;
|
|
job->proc_opts.flags = UV_PROCESS_WINDOWS_HIDE;
|
|
job->proc_opts.exit_cb = exit_cb;
|
|
job->proc_opts.cwd = NULL;
|
|
job->proc_opts.env = NULL;
|
|
job->proc.data = NULL;
|
|
job->proc_stdin.data = NULL;
|
|
job->proc_stdout.data = NULL;
|
|
job->proc_stderr.data = NULL;
|
|
job->defer = defer;
|
|
|
|
// Initialize the job std{in,out,err}
|
|
uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
|
|
job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
|
|
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
|
|
|
|
uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
|
|
job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
|
|
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
|
|
|
|
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
|
|
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
|
|
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
|
|
|
|
// Spawn the job
|
|
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
|
|
free_job(job);
|
|
*status = -1;
|
|
return NULL;
|
|
}
|
|
|
|
// Give all handles a reference to the job
|
|
handle_set_job((uv_handle_t *)&job->proc, job);
|
|
handle_set_job((uv_handle_t *)&job->proc_stdin, job);
|
|
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
|
|
handle_set_job((uv_handle_t *)&job->proc_stderr, job);
|
|
|
|
job->in = wstream_new(maxmem);
|
|
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
|
|
// Start the readable streams
|
|
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
|
|
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
|
|
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
|
|
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
|
|
rstream_start(job->out);
|
|
rstream_start(job->err);
|
|
// Save the job to the table
|
|
table[i] = job;
|
|
|
|
// Start polling job status if this is the first
|
|
if (job_count == 0) {
|
|
uv_prepare_start(&job_prepare, job_prepare_cb);
|
|
}
|
|
job_count++;
|
|
|
|
return job;
|
|
}
|
|
|
|
/// Finds a job instance by id
|
|
///
|
|
/// @param id The job id
|
|
/// @return the Job instance
|
|
Job *job_find(int id)
|
|
{
|
|
Job *job;
|
|
|
|
if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
|
|
|| job->stopped) {
|
|
return NULL;
|
|
}
|
|
|
|
return job;
|
|
}
|
|
|
|
/// Terminates a job. This is a non-blocking operation, but if the job exists
|
|
/// it's guaranteed to succeed(SIGKILL will eventually be sent)
|
|
///
|
|
/// @param job The Job instance
|
|
void job_stop(Job *job)
|
|
{
|
|
job->stopped = true;
|
|
}
|
|
|
|
/// job_wait - synchronously wait for a job to finish
|
|
///
|
|
/// @param job The job instance
|
|
/// @param ms Number of milliseconds to wait, 0 for not waiting, -1 for
|
|
/// waiting until the job quits.
|
|
/// @return returns the status code of the exited job. -1 if the job is
|
|
/// still running and the `timeout` has expired. Note that this is
|
|
/// indistinguishable from the process returning -1 by itself. Which
|
|
/// is possible on some OS.
|
|
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
|
|
{
|
|
// switch to cooked so `got_int` will be set if the user interrupts
|
|
int old_mode = cur_tmode;
|
|
settmode(TMODE_COOK);
|
|
|
|
EventSource sources[] = {job_event_source(job), signal_event_source(), NULL};
|
|
|
|
// keep track of the elapsed time if ms > 0
|
|
uint64_t before = (ms > 0) ? os_hrtime() : 0;
|
|
|
|
while (1) {
|
|
// check if the job has exited (and the status is available).
|
|
if (job->pending_refs == 0) {
|
|
break;
|
|
}
|
|
|
|
event_poll(ms, sources);
|
|
|
|
// we'll assume that a user frantically hitting interrupt doesn't like
|
|
// the current job. Signal that it has to be killed.
|
|
if (got_int) {
|
|
job_stop(job);
|
|
}
|
|
|
|
if (ms == 0) {
|
|
break;
|
|
}
|
|
|
|
// check if the poll timed out, if not, decrease the ms to wait for the
|
|
// next run
|
|
if (ms > 0) {
|
|
uint64_t now = os_hrtime();
|
|
ms -= (int) ((now - before) / 1000000);
|
|
before = now;
|
|
|
|
// if the time elapsed is greater than the `ms` wait time, break
|
|
if (ms <= 0) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
settmode(old_mode);
|
|
|
|
// return -1 for a timeout, the job status otherwise
|
|
return (job->pending_refs) ? -1 : (int) job->status;
|
|
}
|
|
|
|
/// Close the pipe used to write to the job.
|
|
///
|
|
/// This can be used for example to indicate to the job process that no more
|
|
/// input is coming, and that it should shut down cleanly.
|
|
///
|
|
/// It has no effect when the input pipe doesn't exist or was already
|
|
/// closed.
|
|
///
|
|
/// @param job The job instance
|
|
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
|
|
{
|
|
if (!job->in) {
|
|
return;
|
|
}
|
|
|
|
// let other functions in the job module know that the in pipe is no more
|
|
wstream_free(job->in);
|
|
job->in = NULL;
|
|
|
|
uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
|
|
}
|
|
|
|
/// All writes that complete after calling this function will be reported
|
|
/// to `cb`.
|
|
///
|
|
/// Use this function to be notified about the status of an in-flight write.
|
|
///
|
|
/// @see {wstream_set_write_cb}
|
|
///
|
|
/// @param job The job instance
|
|
/// @param cb The function that will be called on write completion or
|
|
/// failure. It will be called with the job as the `data` argument.
|
|
void job_write_cb(Job *job, wstream_cb cb) FUNC_ATTR_NONNULL_ALL
|
|
{
|
|
wstream_set_write_cb(job->in, cb, job);
|
|
}
|
|
|
|
/// Writes data to the job's stdin. This is a non-blocking operation, it
|
|
/// returns when the write request was sent.
|
|
///
|
|
/// @param job The Job instance
|
|
/// @param buffer The buffer which contains the data to be written
|
|
/// @return true if the write request was successfully sent, false if writing
|
|
/// to the job stream failed (possibly because the OS buffer is full)
|
|
bool job_write(Job *job, WBuffer *buffer)
|
|
{
|
|
return wstream_write(job->in, buffer);
|
|
}
|
|
|
|
/// Runs the read callback associated with the job exit event
|
|
///
|
|
/// @param event Object containing data necessary to invoke the callback
|
|
void job_exit_event(Event event)
|
|
{
|
|
job_exit_callback(event.data.job);
|
|
}
|
|
|
|
/// Get the job id
|
|
///
|
|
/// @param job A pointer to the job
|
|
/// @return The job id
|
|
int job_id(Job *job)
|
|
{
|
|
return job->id;
|
|
}
|
|
|
|
/// Get data associated with a job
|
|
///
|
|
/// @param job A pointer to the job
|
|
/// @return The job data
|
|
void *job_data(Job *job)
|
|
{
|
|
return job->data;
|
|
}
|
|
|
|
EventSource job_event_source(Job *job)
|
|
{
|
|
return job;
|
|
}
|
|
|
|
static void job_exit_callback(Job *job)
|
|
{
|
|
// Free the slot now, 'exit_cb' may want to start another job to replace
|
|
// this one
|
|
table[job->id - 1] = NULL;
|
|
|
|
if (job->exit_cb) {
|
|
// Invoke the exit callback
|
|
job->exit_cb(job, job->data);
|
|
}
|
|
|
|
// Free the job resources
|
|
free_job(job);
|
|
|
|
// Stop polling job status if this was the last
|
|
job_count--;
|
|
if (job_count == 0) {
|
|
uv_prepare_stop(&job_prepare);
|
|
}
|
|
}
|
|
|
|
static bool is_alive(Job *job)
|
|
{
|
|
return uv_process_kill(&job->proc, 0) == 0;
|
|
}
|
|
|
|
static void free_job(Job *job)
|
|
{
|
|
uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
|
|
if (job->in) {
|
|
uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
|
|
}
|
|
uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
|
|
uv_close((uv_handle_t *)&job->proc, close_cb);
|
|
}
|
|
|
|
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
|
|
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
|
|
static void job_prepare_cb(uv_prepare_t *handle)
|
|
{
|
|
Job *job;
|
|
int i;
|
|
|
|
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
|
|
if ((job = table[i]) == NULL || !job->stopped) {
|
|
continue;
|
|
}
|
|
|
|
if ((job->exit_timeout--) == EXIT_TIMEOUT) {
|
|
// Job was just stopped, close all stdio handles and send SIGTERM
|
|
uv_process_kill(&job->proc, SIGTERM);
|
|
} else if (job->exit_timeout == 0) {
|
|
// We've waited long enough, send SIGKILL
|
|
uv_process_kill(&job->proc, SIGKILL);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wraps the call to std{out,err}_cb and emits a JobExit event if necessary.
|
|
static void read_cb(RStream *rstream, void *data, bool eof)
|
|
{
|
|
Job *job = data;
|
|
|
|
if (rstream == job->out) {
|
|
job->stdout_cb(rstream, data, eof);
|
|
} else {
|
|
job->stderr_cb(rstream, data, eof);
|
|
}
|
|
|
|
if (eof && --job->pending_refs == 0) {
|
|
emit_exit_event(job);
|
|
}
|
|
}
|
|
|
|
// Emits a JobExit event if both rstreams are closed
|
|
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
|
|
{
|
|
Job *job = handle_get_job((uv_handle_t *)proc);
|
|
|
|
job->status = status;
|
|
if (--job->pending_refs == 0) {
|
|
emit_exit_event(job);
|
|
}
|
|
}
|
|
|
|
static void emit_exit_event(Job *job)
|
|
{
|
|
Event event = {
|
|
.source = job_event_source(job),
|
|
.type = kEventJobExit,
|
|
.data.job = job
|
|
};
|
|
event_push(event);
|
|
}
|
|
|
|
static void close_cb(uv_handle_t *handle)
|
|
{
|
|
Job *job = handle_get_job(handle);
|
|
|
|
if (--job->pending_closes == 0) {
|
|
// Only free the job memory after all the associated handles are properly
|
|
// closed by libuv
|
|
rstream_free(job->out);
|
|
rstream_free(job->err);
|
|
if (job->in) {
|
|
wstream_free(job->in);
|
|
}
|
|
|
|
// Free data memory of process and pipe handles, that was allocated
|
|
// by handle_set_job in job_start.
|
|
free(job->proc.data);
|
|
free(job->proc_stdin.data);
|
|
free(job->proc_stdout.data);
|
|
free(job->proc_stderr.data);
|
|
|
|
shell_free_argv(job->proc_opts.args);
|
|
free(job);
|
|
}
|
|
}
|