job: Let job_start callers to selectively ignore stdio

Passing NULL as the callback for stdout/stderr will result in job_start ignoring
stdout/stderr, respectively. A 'writable' boolean argument was also added, and
when false `job_start` will ignore stdin.

Also, refactor os_system to allow passing NULL as the `output` argument.
This commit is contained in:
Thiago de Arruda
2014-10-30 10:19:48 -03:00
parent 25e26e0056
commit c92d17b4aa
4 changed files with 85 additions and 45 deletions

View File

@@ -10675,6 +10675,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
job_start(argv, job_start(argv,
xstrdup((char *)argvars[0].vval.v_string), xstrdup((char *)argvars[0].vval.v_string),
true,
on_job_stdout, on_job_stdout,
on_job_stderr, on_job_stderr,
on_job_exit, on_job_exit,

View File

@@ -119,6 +119,7 @@ uint64_t channel_from_job(char **argv)
int status; int status;
channel->data.job = job_start(argv, channel->data.job = job_start(argv,
channel, channel,
true,
job_out, job_out,
job_err, job_err,
job_exit, job_exit,

View File

@@ -136,10 +136,12 @@ void job_teardown(void)
/// @param argv Argument vector for the process. The first item is the /// @param argv Argument vector for the process. The first item is the
/// executable to run. /// executable to run.
/// @param data Caller data that will be associated with the job /// @param data Caller data that will be associated with the job
/// @param writable If true the job stdin will be available for writing with
/// job_write, otherwise it will be redirected to /dev/null
/// @param stdout_cb Callback that will be invoked when data is available /// @param stdout_cb Callback that will be invoked when data is available
/// on stdout /// on stdout. If NULL stdout will be redirected to /dev/null.
/// @param stderr_cb Callback that will be invoked when data is available /// @param stderr_cb Callback that will be invoked when data is available
/// on stderr /// on stderr. If NULL stderr will be redirected to /dev/null.
/// @param job_exit_cb Callback that will be invoked when the job exits /// @param job_exit_cb Callback that will be invoked when the job exits
/// @param maxmem Maximum amount of memory used by the job WStream /// @param maxmem Maximum amount of memory used by the job WStream
/// @param[out] status The job id if the job started successfully, 0 if the job /// @param[out] status The job id if the job started successfully, 0 if the job
@@ -147,6 +149,7 @@ void job_teardown(void)
/// @return The job pointer if the job started successfully, NULL otherwise /// @return The job pointer if the job started successfully, NULL otherwise
Job *job_start(char **argv, Job *job_start(char **argv,
void *data, void *data,
bool writable,
rstream_cb stdout_cb, rstream_cb stdout_cb,
rstream_cb stderr_cb, rstream_cb stderr_cb,
job_exit_cb job_exit_cb, job_exit_cb job_exit_cb,
@@ -174,7 +177,7 @@ Job *job_start(char **argv,
job->id = i + 1; job->id = i + 1;
*status = job->id; *status = job->id;
job->status = -1; job->status = -1;
job->refcount = 4; job->refcount = 1;
job->data = data; job->data = data;
job->stdout_cb = stdout_cb; job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb; job->stderr_cb = stderr_cb;
@@ -193,45 +196,76 @@ Job *job_start(char **argv,
job->proc_stdin.data = NULL; job->proc_stdin.data = NULL;
job->proc_stdout.data = NULL; job->proc_stdout.data = NULL;
job->proc_stderr.data = NULL; job->proc_stderr.data = NULL;
job->in = NULL;
job->out = NULL;
job->err = NULL;
// Initialize the job std{in,out,err} // Initialize the job std{in,out,err}
job->stdio[0].flags = UV_IGNORE;
job->stdio[1].flags = UV_IGNORE;
job->stdio[2].flags = UV_IGNORE;
if (writable) {
uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0); uv_pipe_init(uv_default_loop(), &job->proc_stdin, 0);
job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; job->stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin; job->stdio[0].data.stream = (uv_stream_t *)&job->proc_stdin;
handle_set_job((uv_handle_t *)&job->proc_stdin, job);
job->refcount++;
}
if (stdout_cb) {
uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0); uv_pipe_init(uv_default_loop(), &job->proc_stdout, 0);
job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout; job->stdio[1].data.stream = (uv_stream_t *)&job->proc_stdout;
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
job->refcount++;
}
if (stderr_cb) {
uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0); uv_pipe_init(uv_default_loop(), &job->proc_stderr, 0);
job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; job->stdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr; job->stdio[2].data.stream = (uv_stream_t *)&job->proc_stderr;
// Give all handles a reference to the job
handle_set_job((uv_handle_t *)&job->proc, job);
handle_set_job((uv_handle_t *)&job->proc_stdin, job);
handle_set_job((uv_handle_t *)&job->proc_stdout, job);
handle_set_job((uv_handle_t *)&job->proc_stderr, job); handle_set_job((uv_handle_t *)&job->proc_stderr, job);
job->refcount++;
}
handle_set_job((uv_handle_t *)&job->proc, job);
// Spawn the job // Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
uv_close((uv_handle_t *)&job->proc_stdin, NULL); if (writable) {
uv_close((uv_handle_t *)&job->proc_stdout, NULL); uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
uv_close((uv_handle_t *)&job->proc_stderr, NULL); }
if (stdout_cb) {
uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
}
if (stderr_cb) {
uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
}
uv_close((uv_handle_t *)&job->proc, close_cb);
event_poll(0); event_poll(0);
// Manually invoke the close_cb to free the job resources
*status = -1; *status = -1;
return NULL; return NULL;
} }
if (writable) {
job->in = wstream_new(maxmem); job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
}
// Start the readable streams // Start the readable streams
if (stdout_cb) {
job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job); job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out); rstream_start(job->out);
}
if (stderr_cb) {
job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->err); rstream_start(job->err);
}
// Save the job to the table // Save the job to the table
table[i] = job; table[i] = job;

View File

@@ -258,8 +258,8 @@ int os_call_shell(char_u *cmd, ShellOpts opts, char_u *extra_shell_arg)
/// @param len The length of the input buffer (not used if `input` == NULL) /// @param len The length of the input buffer (not used if `input` == NULL)
/// @param[out] output A pointer to to a location where the output will be /// @param[out] output A pointer to to a location where the output will be
/// allocated and stored. Will point to NULL if the shell /// allocated and stored. Will point to NULL if the shell
/// command did not output anything. NOTE: it's not /// command did not output anything. If NULL is passed,
/// allowed to pass NULL yet /// the shell output will be ignored.
/// @param[out] nread the number of bytes in the returned buffer (if the /// @param[out] nread the number of bytes in the returned buffer (if the
/// returned buffer is not NULL) /// returned buffer is not NULL)
/// @return the return code of the process, -1 if the process couldn't be /// @return the return code of the process, -1 if the process couldn't be
@@ -268,7 +268,7 @@ int os_system(const char *cmd,
const char *input, const char *input,
size_t len, size_t len,
char **output, char **output,
size_t *nread) FUNC_ATTR_NONNULL_ARG(1, 4) size_t *nread) FUNC_ATTR_NONNULL_ARG(1)
{ {
// the output buffer // the output buffer
dyn_buffer_t buf; dyn_buffer_t buf;
@@ -279,8 +279,9 @@ int os_system(const char *cmd,
int i; int i;
Job *job = job_start(argv, Job *job = job_start(argv,
&buf, &buf,
system_data_cb, input != NULL,
system_data_cb, output ? system_data_cb : NULL,
output ? system_data_cb : NULL,
NULL, NULL,
0, 0,
&i); &i);
@@ -300,13 +301,15 @@ int os_system(const char *cmd,
job_stop(job); job_stop(job);
return -1; return -1;
} }
// close the input stream, let the process know that no more input is
// coming
job_close_in(job);
} }
// close the input stream, let the process know that no more input is coming
job_close_in(job);
int status = job_wait(job, -1); int status = job_wait(job, -1);
// prepare the out parameters if requested // prepare the out parameters if requested
if (output) {
if (buf.len == 0) { if (buf.len == 0) {
// no data received from the process, return NULL // no data received from the process, return NULL
*output = NULL; *output = NULL;
@@ -320,6 +323,7 @@ int os_system(const char *cmd,
if (nread) { if (nread) {
*nread = buf.len; *nread = buf.len;
} }
}
return status; return status;
} }