Merge PR #1365 'Refactor os_call_shell to use os_system'

This commit is contained in:
Thiago de Arruda
2014-10-31 22:56:53 -03:00
5 changed files with 267 additions and 345 deletions

View File

@@ -10675,6 +10675,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
job_start(argv, job_start(argv,
xstrdup((char *)argvars[0].vval.v_string), xstrdup((char *)argvars[0].vval.v_string),
true,
on_job_stdout, on_job_stdout,
on_job_stderr, on_job_stderr,
on_job_exit, on_job_exit,

View File

@@ -119,6 +119,7 @@ uint64_t channel_from_job(char **argv)
int status; int status;
channel->data.job = job_start(argv, channel->data.job = job_start(argv,
channel, channel,
true,
job_out, job_out,
job_err, job_err,
job_exit, job_exit,

View File

@@ -13,10 +13,14 @@
#include "nvim/os/event.h" #include "nvim/os/event.h"
#include "nvim/os/event_defs.h" #include "nvim/os/event_defs.h"
#include "nvim/os/shell.h" #include "nvim/os/shell.h"
#include "nvim/os/time.h"
#include "nvim/vim.h" #include "nvim/vim.h"
#include "nvim/memory.h" #include "nvim/memory.h"
#define EXIT_TIMEOUT 25 // {SIGNAL}_TIMEOUT is the time (in nanoseconds) that a job has to cleanly exit
// before we send SIGNAL to it
#define TERM_TIMEOUT 1000000000
#define KILL_TIMEOUT (TERM_TIMEOUT * 2)
#define MAX_RUNNING_JOBS 100 #define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 0xFFFF #define JOB_BUFFER_SIZE 0xFFFF
@@ -39,14 +43,14 @@ struct job {
// Job id the index in the job table plus one. // Job id the index in the job table plus one.
int id; int id;
// Exit status code of the job process // Exit status code of the job process
int64_t status; int status;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
// Number of references to the job. The job resources will only be freed by // Number of references to the job. The job resources will only be freed by
// close_cb when this is 0 // close_cb when this is 0
int refcount; int refcount;
// If the job was already stopped // Time when job_stop was called for the job.
bool stopped; uint64_t stopped_time;
// If SIGTERM was already sent to the job(only send one before SIGKILL)
bool term_sent;
// Data associated with the job // Data associated with the job
void *data; void *data;
// Callbacks // Callbacks
@@ -64,8 +68,8 @@ struct job {
}; };
static Job *table[MAX_RUNNING_JOBS] = {NULL}; static Job *table[MAX_RUNNING_JOBS] = {NULL};
static uint32_t job_count = 0; size_t stop_requests = 0;
static uv_prepare_t job_prepare; static uv_timer_t job_stop_timer;
// Some helpers shared in this module // Some helpers shared in this module
@@ -78,7 +82,7 @@ static uv_prepare_t job_prepare;
void job_init(void) void job_init(void)
{ {
uv_disable_stdio_inheritance(); uv_disable_stdio_inheritance();
uv_prepare_init(uv_default_loop(), &job_prepare); uv_timer_init(uv_default_loop(), &job_stop_timer);
} }
/// Releases job control resources and terminates running jobs /// Releases job control resources and terminates running jobs
@@ -136,10 +140,12 @@ void job_teardown(void)
/// @param argv Argument vector for the process. The first item is the /// @param argv Argument vector for the process. The first item is the
/// executable to run. /// executable to run.
/// @param data Caller data that will be associated with the job /// @param data Caller data that will be associated with the job
/// @param writable If true the job stdin will be available for writing with
/// job_write, otherwise it will be redirected to /dev/null
/// @param stdout_cb Callback that will be invoked when data is available /// @param stdout_cb Callback that will be invoked when data is available
/// on stdout /// on stdout. If NULL stdout will be redirected to /dev/null.
/// @param stderr_cb Callback that will be invoked when data is available /// @param stderr_cb Callback that will be invoked when data is available
/// on stderr /// on stderr. If NULL stderr will be redirected to /dev/null.
/// @param job_exit_cb Callback that will be invoked when the job exits /// @param job_exit_cb Callback that will be invoked when the job exits
/// @param maxmem Maximum amount of memory used by the job WStream /// @param maxmem Maximum amount of memory used by the job WStream
/// @param[out] status The job id if the job started successfully, 0 if the job /// @param[out] status The job id if the job started successfully, 0 if the job
@@ -147,6 +153,7 @@ void job_teardown(void)
/// @return The job pointer if the job started successfully, NULL otherwise /// @return The job pointer if the job started successfully, NULL otherwise
Job *job_start(char **argv, Job *job_start(char **argv,
void *data, void *data,
bool writable,
rstream_cb stdout_cb, rstream_cb stdout_cb,
rstream_cb stderr_cb, rstream_cb stderr_cb,
job_exit_cb job_exit_cb, job_exit_cb job_exit_cb,
@@ -174,13 +181,13 @@ Job *job_start(char **argv,
job->id = i + 1; job->id = i + 1;
*status = job->id; *status = job->id;
job->status = -1; job->status = -1;
job->refcount = 4; job->refcount = 1;
job->data = data; job->data = data;
job->stdout_cb = stdout_cb; job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb; job->stderr_cb = stderr_cb;
job->exit_cb = job_exit_cb; job->exit_cb = job_exit_cb;
job->stopped = false; job->stopped_time = 0;
job->exit_timeout = EXIT_TIMEOUT; job->term_sent = false;
job->proc_opts.file = argv[0]; job->proc_opts.file = argv[0];
job->proc_opts.args = argv; job->proc_opts.args = argv;
job->proc_opts.stdio = job->stdio; job->proc_opts.stdio = job->stdio;
@@ -193,50 +200,79 @@ Job *job_start(char **argv,
job->proc_stdin.data = NULL; job->proc_stdin.data = NULL;
job->proc_stdout.data = NULL; job->proc_stdout.data = NULL;
job->proc_stderr.data = NULL; job->proc_stderr.data = NULL;
job->in = NULL;
job->out = NULL;
job->err = NULL;
// Initialize the job std{in,out,err} // Initialize the job std{in,out,err}
uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); job->stdio[0].flags = UV_IGNORE;
job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; job->stdio[1].flags = UV_IGNORE;
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; job->stdio[2].flags = UV_IGNORE;
uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); if (writable) {
job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
handle_set_job((uv_handle_t *)&job->proc_stdin, job);
job->refcount++;
}
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); if (stdout_cb) {
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
job->refcount++;
}
if (stderr_cb) {
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
handle_set_job((uv_handle_t *)&job->proc_stderr, job);
job->refcount++;
}
// Give all handles a reference to the job
handle_set_job((uv_handle_t *)&job->proc, job); handle_set_job((uv_handle_t *)&job->proc, job);
handle_set_job((uv_handle_t *)&job->proc_stdin, job);
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
handle_set_job((uv_handle_t *)&job->proc_stderr, job);
// Spawn the job // Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
if (writable) {
uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
}
if (stdout_cb) {
uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
}
if (stderr_cb) {
uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
}
uv_close((uv_handle_t *)&job->proc, close_cb);
event_poll(0);
// Manually invoke the close_cb to free the job resources
*status = -1; *status = -1;
return NULL; return NULL;
} }
job->in = wstream_new(maxmem); if (writable) {
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
}
// Start the readable streams // Start the readable streams
job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); if (stdout_cb) {
job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out);
rstream_start(job->out); }
rstream_start(job->err);
if (stderr_cb) {
job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->err);
}
// Save the job to the table // Save the job to the table
table[i] = job; table[i] = job;
// Start polling job status if this is the first
if (job_count == 0) {
uv_prepare_start(&job_prepare, job_prepare_cb);
}
job_count++;
return job; return job;
} }
@@ -249,7 +285,7 @@ Job *job_find(int id)
Job *job; Job *job;
if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1]) if (id <= 0 || id > MAX_RUNNING_JOBS || !(job = table[id - 1])
|| job->stopped) { || job->stopped_time) {
return NULL; return NULL;
} }
@@ -262,7 +298,22 @@ Job *job_find(int id)
/// @param job The Job instance /// @param job The Job instance
void job_stop(Job *job) void job_stop(Job *job)
{ {
job->stopped = true; if (job->stopped_time) {
return;
}
job->stopped_time = os_hrtime();
// Close the standard streams of the job
close_job_in(job);
close_job_out(job);
close_job_err(job);
if (!stop_requests++) {
// When there's at least one stop request pending, start a timer that
// will periodically check if a signal should be send to a to the job
DLOG("Starting job kill timer");
uv_timer_start(&job_stop_timer, job_stop_timer_cb, 100, 100);
}
} }
/// job_wait - synchronously wait for a job to finish /// job_wait - synchronously wait for a job to finish
@@ -276,6 +327,9 @@ void job_stop(Job *job)
/// is possible on some OS. /// is possible on some OS.
int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
{ {
// The default status is -1, which represents a timeout
int status = -1;
// Increase refcount to stop the job from being freed before we have a // Increase refcount to stop the job from being freed before we have a
// chance to get the status. // chance to get the status.
job->refcount++; job->refcount++;
@@ -291,15 +345,16 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
event_poll(0); event_poll(0);
} }
if (!--job->refcount) { if (job->refcount == 1) {
int status = (int) job->status; // Job exited, collect status and manually invoke close_cb to free the job
// Manually invoke close_cb to free the job resources // resources
status = job->status;
close_cb((uv_handle_t *)&job->proc); close_cb((uv_handle_t *)&job->proc);
return status; } else {
job->refcount--;
} }
// return -1 for a timeout return status;
return -1;
} }
/// Close the pipe used to write to the job. /// Close the pipe used to write to the job.
@@ -372,10 +427,10 @@ static void job_exit_callback(Job *job)
job->exit_cb(job, job->data); job->exit_cb(job, job->data);
} }
// Stop polling job status if this was the last if (!--stop_requests) {
job_count--; // Stop the timer if no more stop requests are pending
if (job_count == 0) { DLOG("Stopping job kill timer");
uv_prepare_stop(&job_prepare); uv_timer_stop(&job_stop_timer);
} }
} }
@@ -386,21 +441,24 @@ static bool is_alive(Job *job)
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0). /// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_prepare_cb(uv_prepare_t *handle) static void job_stop_timer_cb(uv_timer_t *handle)
{ {
Job *job; Job *job;
int i; uint64_t now = os_hrtime();
for (i = 0; i < MAX_RUNNING_JOBS; i++) { for (size_t i = 0; i < MAX_RUNNING_JOBS; i++) {
if ((job = table[i]) == NULL || !job->stopped) { if ((job = table[i]) == NULL || !job->stopped_time) {
continue; continue;
} }
if ((job->exit_timeout--) == EXIT_TIMEOUT) { uint64_t elapsed = now - job->stopped_time;
// Job was just stopped, close all stdio handles and send SIGTERM
if (!job->term_sent && elapsed >= TERM_TIMEOUT) {
ILOG("Sending SIGTERM to job(id: %d)", job->id);
uv_process_kill(&job->proc, SIGTERM); uv_process_kill(&job->proc, SIGTERM);
} else if (job->exit_timeout == 0) { job->term_sent = true;
// We've waited long enough, send SIGKILL } else if (elapsed >= KILL_TIMEOUT) {
ILOG("Sending SIGKILL to job(id: %d)", job->id);
uv_process_kill(&job->proc, SIGKILL); uv_process_kill(&job->proc, SIGKILL);
} }
} }
@@ -429,7 +487,7 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{ {
Job *job = handle_get_job((uv_handle_t *)proc); Job *job = handle_get_job((uv_handle_t *)proc);
job->status = status; job->status = (int)status;
uv_close((uv_handle_t *)&job->proc, close_cb); uv_close((uv_handle_t *)&job->proc, close_cb);
} }

View File

@@ -309,6 +309,11 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count)
return rbuffer_read(rstream->buffer, buffer, count); return rbuffer_read(rstream->buffer, buffer, count);
} }
RBuffer *rstream_buffer(RStream *rstream)
{
return rstream->buffer;
}
// Callbacks used by libuv // Callbacks used by libuv
// Called by libuv to allocate memory for reading. // Called by libuv to allocate memory for reading.

View File

@@ -24,24 +24,14 @@
#include "nvim/option_defs.h" #include "nvim/option_defs.h"
#include "nvim/charset.h" #include "nvim/charset.h"
#include "nvim/strings.h" #include "nvim/strings.h"
#include "nvim/ui.h"
#define BUFFER_LENGTH 1024 #define DYNAMIC_BUFFER_INIT {NULL, 0, 0}
typedef struct {
bool reading;
int old_state, old_mode, exit_status, exited;
char *wbuffer;
char rbuffer[BUFFER_LENGTH];
uv_buf_t bufs[2];
uv_stream_t *shell_stdin;
garray_T ga;
} ProcessData;
typedef struct { typedef struct {
char *data; char *data;
size_t cap; size_t cap, len;
size_t len; } DynamicBuffer;
} dyn_buffer_t;
#ifdef INCLUDE_GENERATED_DECLARATIONS #ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/shell.c.generated.h" # include "os/shell.c.generated.h"
@@ -109,140 +99,70 @@ void shell_free_argv(char **argv)
/// @param cmd The command to be executed. If NULL it will run an interactive /// @param cmd The command to be executed. If NULL it will run an interactive
/// shell /// shell
/// @param opts Various options that control how the shell will work /// @param opts Various options that control how the shell will work
/// @param extra_shell_arg Extra argument to be passed to the shell /// @param extra_arg Extra argument to be passed to the shell
int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg) int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_arg)
{ {
uv_stdio_container_t proc_stdio[3]; DynamicBuffer input = DYNAMIC_BUFFER_INIT;
uv_process_options_t proc_opts; char *output = NULL, **output_ptr = NULL;
uv_process_t proc; int current_state = State, old_mode = cur_tmode;
uv_pipe_t proc_stdin, proc_stdout; bool forward_output = true;
uv_write_t write_req;
int expected_exits = 1;
ProcessData pdata = {
.reading = false,
.exited = 0,
.old_mode = cur_tmode,
.old_state = State,
.shell_stdin = (uv_stream_t *)&proc_stdin,
.wbuffer = NULL,
};
out_flush(); out_flush();
if (opts & kShellOptCooked) { if (opts & kShellOptCooked) {
// set to normal mode
settmode(TMODE_COOK); settmode(TMODE_COOK);
} }
// While the child is running, ignore terminating signals // While the child is running, ignore terminating signals
signal_reject_deadly(); signal_reject_deadly();
// Create argv for `uv_spawn`
// TODO(tarruda): we can use a static buffer for small argument vectors. 1024
// bytes should be enough for most of the commands and if more is necessary
// we can allocate a another buffer
proc_opts.args = shell_build_argv(cmd, extra_shell_arg);
proc_opts.file = proc_opts.args[0];
proc_opts.exit_cb = exit_cb;
// Initialize libuv structures
proc_opts.stdio = proc_stdio;
proc_opts.stdio_count = 3;
// Hide window on Windows :)
proc_opts.flags = UV_PROCESS_WINDOWS_HIDE;
proc_opts.cwd = NULL;
proc_opts.env = NULL;
// The default is to inherit all standard file descriptors(this will change
// when the UI is moved to an external process)
proc_stdio[0].flags = UV_INHERIT_FD;
proc_stdio[0].data.fd = 0;
proc_stdio[1].flags = UV_INHERIT_FD;
proc_stdio[1].data.fd = 1;
proc_stdio[2].flags = UV_INHERIT_FD;
proc_stdio[2].data.fd = 2;
if (opts & (kShellOptHideMess | kShellOptExpand)) { if (opts & (kShellOptHideMess | kShellOptExpand)) {
// Ignore the shell stdio(redirects to /dev/null on unixes) forward_output = false;
proc_stdio[0].flags = UV_IGNORE;
proc_stdio[1].flags = UV_IGNORE;
proc_stdio[2].flags = UV_IGNORE;
} else { } else {
State = EXTERNCMD; State = EXTERNCMD;
if (opts & kShellOptWrite) { if (opts & kShellOptWrite) {
// Write from the current buffer into the process stdin read_input(&input);
uv_pipe_init(uv_default_loop(), &proc_stdin, 0);
write_req.data = &pdata;
proc_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
proc_stdio[0].data.stream = (uv_stream_t *)&proc_stdin;
} }
if (opts & kShellOptRead) { if (opts & kShellOptRead) {
// Read from the process stdout into the current buffer output_ptr = &output;
uv_pipe_init(uv_default_loop(), &proc_stdout, 0); forward_output = false;
proc_stdout.data = &pdata;
proc_stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
proc_stdio[1].data.stream = (uv_stream_t *)&proc_stdout;
ga_init(&pdata.ga, 1, BUFFER_LENGTH);
} }
} }
if (uv_spawn(uv_default_loop(), &proc, &proc_opts)) { size_t nread;
// Failed, probably due to `sh` not being executable int status = shell((const char *)cmd,
if (!emsg_silent) { (const char *)extra_arg,
MSG_PUTS(_("\nCannot execute shell ")); input.data,
msg_outtrans(p_sh); input.len,
msg_putchar('\n'); output_ptr,
} &nread,
emsg_silent,
forward_output);
return proc_cleanup_exit(&pdata, &proc_opts, opts); if (input.data) {
free(input.data);
} }
// Assign the flag address after `proc` is initialized by `uv_spawn` if (output) {
proc.data = &pdata; write_output(output, nread);
free(output);
if (opts & kShellOptWrite) {
// Queue everything for writing to the shell stdin
write_selection(&write_req);
expected_exits++;
} }
if (opts & kShellOptRead) { if (!emsg_silent && status != 0 && !(opts & kShellOptSilent)) {
// Start the read stream for the shell stdout MSG_PUTS(_("\nshell returned "));
uv_read_start((uv_stream_t *)&proc_stdout, alloc_cb, read_cb); msg_outnum(status);
expected_exits++; msg_putchar('\n');
} }
// Keep running the loop until all three handles are completely closed if (old_mode == TMODE_RAW) {
while (pdata.exited < expected_exits) { // restore mode
event_poll(0); settmode(TMODE_RAW);
if (got_int) {
// Forward SIGINT to the shell
// TODO(tarruda): for now this is only needed if the terminal is in raw
// mode, but when the UI is externalized we'll also need it, so leave it
// here
uv_process_kill(&proc, SIGINT);
got_int = false;
}
} }
State = current_state;
signal_accept_deadly();
if (opts & kShellOptRead) { return status;
if (!GA_EMPTY(&pdata.ga)) {
// If there's an unfinished line in the growable array, append it now.
append_ga_line(&pdata.ga);
// remember that the NL was missing
curbuf->b_no_eol_lnum = curwin->w_cursor.lnum;
} else {
curbuf->b_no_eol_lnum = 0;
}
ga_clear(&pdata.ga);
}
if (opts & kShellOptWrite) {
free(pdata.wbuffer);
}
return proc_cleanup_exit(&pdata, &proc_opts, opts);
} }
/// os_system - synchronously execute a command in the shell /// os_system - synchronously execute a command in the shell
@@ -258,8 +178,8 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg)
/// @param len The length of the input buffer (not used if `input` == NULL) /// @param len The length of the input buffer (not used if `input` == NULL)
/// @param[out] output A pointer to to a location where the output will be /// @param[out] output A pointer to to a location where the output will be
/// allocated and stored. Will point to NULL if the shell /// allocated and stored. Will point to NULL if the shell
/// command did not output anything. NOTE: it's not /// command did not output anything. If NULL is passed,
/// allowed to pass NULL yet /// the shell output will be ignored.
/// @param[out] nread the number of bytes in the returned buffer (if the /// @param[out] nread the number of bytes in the returned buffer (if the
/// returned buffer is not NULL) /// returned buffer is not NULL)
/// @return the return code of the process, -1 if the process couldn't be /// @return the return code of the process, -1 if the process couldn't be
@@ -268,26 +188,50 @@ int os_system(const char *cmd,
const char *input, const char *input,
size_t len, size_t len,
char **output, char **output,
size_t *nread) FUNC_ATTR_NONNULL_ARG(1, 4) size_t *nread) FUNC_ATTR_NONNULL_ARG(1)
{
return shell(cmd, NULL, input, len, output, nread, true, false);
}
static int shell(const char *cmd,
const char *extra_arg,
const char *input,
size_t len,
char **output,
size_t *nread,
bool silent,
bool forward_output) FUNC_ATTR_NONNULL_ARG(1)
{ {
// the output buffer // the output buffer
dyn_buffer_t buf; DynamicBuffer buf = DYNAMIC_BUFFER_INIT;
memset(&buf, 0, sizeof(buf)); rstream_cb data_cb = system_data_cb;
char **argv = shell_build_argv((char_u *) cmd, NULL); if (forward_output) {
data_cb = out_data_cb;
} else if (!output) {
data_cb = NULL;
}
int i; char **argv = shell_build_argv((char_u *) cmd, (char_u *)extra_arg);
int status;
Job *job = job_start(argv, Job *job = job_start(argv,
&buf, &buf,
system_data_cb, input != NULL,
system_data_cb, data_cb,
data_cb,
NULL, NULL,
0, 0,
&i); &status);
if (i <= 0) { if (status <= 0) {
// couldn't even start the job // Failed, probably due to `sh` not being executable
ELOG("Couldn't start job, error code: '%d'", i); ELOG("Couldn't start job, command: '%s', error code: '%d'", cmd, status);
if (!silent) {
MSG_PUTS(_("\nCannot execute shell "));
msg_outtrans(p_sh);
msg_putchar('\n');
}
return -1; return -1;
} }
@@ -300,34 +244,37 @@ int os_system(const char *cmd,
job_stop(job); job_stop(job);
return -1; return -1;
} }
// close the input stream, let the process know that no more input is
// coming
job_close_in(job);
} }
// close the input stream, let the process know that no more input is coming status = job_wait(job, -1);
job_close_in(job);
int status = job_wait(job, -1);
// prepare the out parameters if requested // prepare the out parameters if requested
if (buf.len == 0) { if (output) {
// no data received from the process, return NULL if (buf.len == 0) {
*output = NULL; // no data received from the process, return NULL
free(buf.data); *output = NULL;
} else { free(buf.data);
// NUL-terminate to make the output directly usable as a C string } else {
buf.data[buf.len] = NUL; // NUL-terminate to make the output directly usable as a C string
*output = buf.data; buf.data[buf.len] = NUL;
} *output = buf.data;
}
if (nread) { if (nread) {
*nread = buf.len; *nread = buf.len;
}
} }
return status; return status;
} }
/// dyn_buf_ensure - ensures at least `desired` bytes in buffer /// - ensures at least `desired` bytes in buffer
/// ///
/// TODO(aktau): fold with kvec/garray /// TODO(aktau): fold with kvec/garray
static void dyn_buf_ensure(dyn_buffer_t *buf, size_t desired) static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
{ {
if (buf->cap >= desired) { if (buf->cap >= desired) {
return; return;
@@ -341,16 +288,24 @@ static void dyn_buf_ensure(dyn_buffer_t *buf, size_t desired)
static void system_data_cb(RStream *rstream, void *data, bool eof) static void system_data_cb(RStream *rstream, void *data, bool eof)
{ {
Job *job = data; Job *job = data;
dyn_buffer_t *buf = job_data(job); DynamicBuffer *buf = job_data(job);
size_t nread = rstream_pending(rstream); size_t nread = rstream_pending(rstream);
dyn_buf_ensure(buf, buf->len + nread + 1); dynamic_buffer_ensure(buf, buf->len + nread + 1);
rstream_read(rstream, buf->data + buf->len, nread); rstream_read(rstream, buf->data + buf->len, nread);
buf->len += nread; buf->len += nread;
} }
static void out_data_cb(RStream *rstream, void *data, bool eof)
{
RBuffer *rbuffer = rstream_buffer(rstream);
size_t len = rbuffer_pending(rbuffer);
ui_write((char_u *)rbuffer_read_ptr(rbuffer), (int)len);
rbuffer_consumed(rbuffer, len);
}
/// Parses a command string into a sequence of words, taking quotes into /// Parses a command string into a sequence of words, taking quotes into
/// consideration. /// consideration.
/// ///
@@ -411,24 +366,11 @@ static size_t word_length(const char_u *str)
/// event loop starts. If we don't(by writing in chunks returned by `ml_get`) /// event loop starts. If we don't(by writing in chunks returned by `ml_get`)
/// the buffer being modified might get modified by reading from the process /// the buffer being modified might get modified by reading from the process
/// before we finish writing. /// before we finish writing.
/// Queues selected range for writing to the child process stdin. static void read_input(DynamicBuffer *buf)
///
/// @param req The structure containing information to peform the write
static void write_selection(uv_write_t *req)
{ {
ProcessData *pdata = (ProcessData *)req->data; size_t written = 0, l = 0, len = 0;
// TODO(tarruda): use a static buffer for up to a limit(BUFFER_LENGTH) and
// only after filled we should start allocating memory(skip unnecessary
// allocations for small writes)
size_t buflen = BUFFER_LENGTH;
pdata->wbuffer = (char *)xmalloc(buflen);
uv_buf_t uvbuf;
linenr_T lnum = curbuf->b_op_start.lnum; linenr_T lnum = curbuf->b_op_start.lnum;
size_t off = 0; char_u *lp = ml_get(lnum);
size_t written = 0;
char_u *lp = ml_get(lnum);
size_t l;
size_t len;
for (;;) { for (;;) {
l = strlen((char *)lp + written); l = strlen((char *)lp + written);
@@ -437,26 +379,17 @@ static void write_selection(uv_write_t *req)
} else if (lp[written] == NL) { } else if (lp[written] == NL) {
// NL -> NUL translation // NL -> NUL translation
len = 1; len = 1;
if (off + len >= buflen) { dynamic_buffer_ensure(buf, buf->len + len);
// Resize the buffer buf->data[buf->len++] = NUL;
buflen *= 2;
pdata->wbuffer = xrealloc(pdata->wbuffer, buflen);
}
pdata->wbuffer[off++] = NUL;
} else { } else {
char_u *s = vim_strchr(lp + written, NL); char_u *s = vim_strchr(lp + written, NL);
len = s == NULL ? l : (size_t)(s - (lp + written)); len = s == NULL ? l : (size_t)(s - (lp + written));
while (off + len >= buflen) { dynamic_buffer_ensure(buf, buf->len + len);
// Resize the buffer memcpy(buf->data + buf->len, lp + written, len);
buflen *= 2; buf->len += len;
pdata->wbuffer = xrealloc(pdata->wbuffer, buflen);
}
memcpy(pdata->wbuffer + off, lp + written, len);
off += len;
} }
if (len == l) { if (len == l) {
// Finished a line, add a NL, unless this line // Finished a line, add a NL, unless this line should not have one.
// should not have one.
// FIXME need to make this more readable // FIXME need to make this more readable
if (lnum != curbuf->b_op_end.lnum if (lnum != curbuf->b_op_end.lnum
|| !curbuf->b_p_bin || !curbuf->b_p_bin
@@ -464,12 +397,8 @@ static void write_selection(uv_write_t *req)
&& (lnum != && (lnum !=
curbuf->b_ml.ml_line_count curbuf->b_ml.ml_line_count
|| curbuf->b_p_eol))) { || curbuf->b_p_eol))) {
if (off + 1 >= buflen) { dynamic_buffer_ensure(buf, buf->len + 1);
// Resize the buffer buf->data[buf->len++] = NL;
buflen *= 2;
pdata->wbuffer = xrealloc(pdata->wbuffer, buflen);
}
pdata->wbuffer[off++] = NL;
} }
++lnum; ++lnum;
if (lnum > curbuf->b_op_end.lnum) { if (lnum > curbuf->b_op_end.lnum) {
@@ -481,112 +410,40 @@ static void write_selection(uv_write_t *req)
written += len; written += len;
} }
} }
uvbuf.base = pdata->wbuffer;
uvbuf.len = off;
uv_write(req, pdata->shell_stdin, &uvbuf, 1, write_cb);
} }
// "Allocates" a buffer for reading from the shell stdout. static void write_output(char *output, size_t remaining)
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{ {
ProcessData *pdata = (ProcessData *)handle->data; if (!output) {
if (pdata->reading) {
buf->len = 0;
return; return;
} }
buf->base = pdata->rbuffer; size_t off = 0;
buf->len = BUFFER_LENGTH; while (off < remaining) {
// Avoid `alloc_cb`, `alloc_cb` sequences on windows if (output[off] == NL) {
pdata->reading = true;
}
static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
{
// TODO(tarruda): avoid using a growable array for this, refactor the
// algorithm to call `ml_append` directly(skip unnecessary copies/resizes)
int i;
ProcessData *pdata = (ProcessData *)stream->data;
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
uv_read_stop(stream);
uv_close((uv_handle_t *)stream, NULL);
pdata->exited++;
}
return;
}
for (i = 0; i < cnt; ++i) {
if (pdata->rbuffer[i] == NL) {
// Insert the line // Insert the line
append_ga_line(&pdata->ga); output[off] = NUL;
} else if (pdata->rbuffer[i] == NUL) { ml_append(curwin->w_cursor.lnum++, (char_u *)output, 0, false);
size_t skip = off + 1;
output += skip;
remaining -= skip;
off = 0;
continue;
}
if (output[off] == NUL) {
// Translate NUL to NL // Translate NUL to NL
ga_append(&pdata->ga, NL); output[off] = NL;
} else {
// buffer data into the grow array
ga_append(&pdata->ga, pdata->rbuffer[i]);
} }
off++;
} }
windgoto(msg_row, msg_col); if (remaining) {
cursor_on(); // append unfinished line
out_flush(); ml_append(curwin->w_cursor.lnum++, (char_u *)output, 0, false);
// remember that the NL was missing
pdata->reading = false; curbuf->b_no_eol_lnum = curwin->w_cursor.lnum;
} } else {
curbuf->b_no_eol_lnum = 0;
static void write_cb(uv_write_t *req, int status)
{
ProcessData *pdata = (ProcessData *)req->data;
uv_close((uv_handle_t *)pdata->shell_stdin, NULL);
pdata->exited++;
}
/// Cleanup memory and restore state modified by `os_call_shell`.
///
/// @param data State shared by all functions collaborating with
/// `os_call_shell`.
/// @param opts Process spawning options, containing some allocated memory
/// @param shellopts Options passed to `os_call_shell`. Used for deciding
/// if/which messages are displayed.
static int proc_cleanup_exit(ProcessData *proc_data,
uv_process_options_t *proc_opts,
int shellopts)
{
if (proc_data->exited) {
if (!emsg_silent && proc_data->exit_status != 0 &&
!(shellopts & kShellOptSilent)) {
MSG_PUTS(_("\nshell returned "));
msg_outnum((int64_t)proc_data->exit_status);
msg_putchar('\n');
}
} }
State = proc_data->old_state;
if (proc_data->old_mode == TMODE_RAW) {
// restore mode
settmode(TMODE_RAW);
}
signal_accept_deadly();
// Release argv memory
shell_free_argv(proc_opts->args);
return proc_data->exit_status;
}
static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
{
ProcessData *data = (ProcessData *)proc->data;
data->exited++;
assert(status <= INT_MAX);
data->exit_status = (int)status;
uv_close((uv_handle_t *)proc, NULL);
} }