job control: reuse common job code for rpc jobs

This makes stderr and exit callbacks work for rpc jobs
This commit is contained in:
Björn Linse
2016-05-12 22:25:15 +02:00
parent 215922120c
commit 2d60a15e25
12 changed files with 271 additions and 139 deletions

View File

@@ -5,11 +5,24 @@ endif
let s:loaded_pythonx_provider = 1 let s:loaded_pythonx_provider = 1
let s:stderr = {}
let s:job_opts = {'rpc': v:true}
" TODO(bfredl): this logic is common and should be builtin
function! s:job_opts.on_stderr(chan_id, data, event)
let stderr = get(s:stderr, a:chan_id, [''])
let last = remove(stderr, -1)
let a:data[0] = last.a:data[0]
call extend(stderr, a:data)
let s:stderr[a:chan_id] = stderr
endfunction
function! provider#pythonx#Require(host) abort function! provider#pythonx#Require(host) abort
let ver = (a:host.orig_name ==# 'python') ? 2 : 3 let ver = (a:host.orig_name ==# 'python') ? 2 : 3
" Python host arguments " Python host arguments
let args = ['-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()'] let prog = (ver == '2' ? provider#python#Prog() : provider#python3#Prog())
let args = [prog, '-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
" Collect registered Python plugins into args " Collect registered Python plugins into args
let python_plugins = remote#host#PluginsForHost(a:host.name) let python_plugins = remote#host#PluginsForHost(a:host.name)
@@ -18,14 +31,16 @@ function! provider#pythonx#Require(host) abort
endfor endfor
try try
let channel_id = rpcstart((ver ==# '2' ? let channel_id = jobstart(args, s:job_opts)
\ provider#python#Prog() : provider#python3#Prog()), args)
if rpcrequest(channel_id, 'poll') ==# 'ok' if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id return channel_id
endif endif
catch catch
echomsg v:throwpoint echomsg v:throwpoint
echomsg v:exception echomsg v:exception
for row in get(s:stderr, channel_id, [])
echomsg row
endfor
endtry endtry
throw remote#host#LoadErrorForHost(a:host.orig_name, throw remote#host#LoadErrorForHost(a:host.orig_name,
\ '$NVIM_PYTHON_LOG_FILE') \ '$NVIM_PYTHON_LOG_FILE')

View File

@@ -4,6 +4,17 @@ if exists('g:loaded_ruby_provider')
endif endif
let g:loaded_ruby_provider = 1 let g:loaded_ruby_provider = 1
let s:stderr = {}
let s:job_opts = {'rpc': v:true}
function! s:job_opts.on_stderr(chan_id, data, event)
let stderr = get(s:stderr, a:chan_id, [''])
let last = remove(stderr, -1)
let a:data[0] = last.a:data[0]
call extend(stderr, a:data)
let s:stderr[a:chan_id] = stderr
endfunction
function! provider#ruby#Detect() abort function! provider#ruby#Detect() abort
return exepath('neovim-ruby-host') return exepath('neovim-ruby-host')
endfunction endfunction
@@ -13,7 +24,7 @@ function! provider#ruby#Prog()
endfunction endfunction
function! provider#ruby#Require(host) abort function! provider#ruby#Require(host) abort
let args = [] let args = [provider#ruby#Prog()]
let ruby_plugins = remote#host#PluginsForHost(a:host.name) let ruby_plugins = remote#host#PluginsForHost(a:host.name)
for plugin in ruby_plugins for plugin in ruby_plugins
@@ -21,13 +32,16 @@ function! provider#ruby#Require(host) abort
endfor endfor
try try
let channel_id = rpcstart(provider#ruby#Prog(), args) let channel_id = jobstart(args, s:job_opts)
if rpcrequest(channel_id, 'poll') ==# 'ok' if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id return channel_id
endif endif
catch catch
echomsg v:throwpoint echomsg v:throwpoint
echomsg v:exception echomsg v:exception
for row in get(s:stderr, channel_id, [])
echomsg row
endfor
endtry endtry
throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_RUBY_LOG_FILE') throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_RUBY_LOG_FILE')
endfunction endfunction

View File

@@ -261,9 +261,8 @@ function! remote#host#LoadErrorForHost(host, log) abort
\ 'You can try to see what happened '. \ 'You can try to see what happened '.
\ 'by starting Neovim with the environment variable '. \ 'by starting Neovim with the environment variable '.
\ a:log . ' set to a file and opening the generated '. \ a:log . ' set to a file and opening the generated '.
\ 'log file. Also, the host stderr will be available '. \ 'log file. Also, the host stderr is available '.
\ 'in Neovim log, so it may contain useful information. '. \ 'in messages.'
\ 'See also ~/.nvimlog.'
endfunction endfunction

View File

@@ -2043,7 +2043,6 @@ rpcnotify({channel}, {event}[, {args}...])
Sends an |RPC| notification to {channel} Sends an |RPC| notification to {channel}
rpcrequest({channel}, {method}[, {args}...]) rpcrequest({channel}, {method}[, {args}...])
Sends an |RPC| request to {channel} Sends an |RPC| request to {channel}
rpcstart({prog}[, {argv}]) Spawns {prog} and opens an |RPC| channel
rpcstop({channel}) Closes an |RPC| {channel} rpcstop({channel}) Closes an |RPC| {channel}
screenattr({row}, {col}) Number attribute at screen position screenattr({row}, {col}) Number attribute at screen position
screenchar({row}, {col}) Number character at screen position screenchar({row}, {col}) Number character at screen position
@@ -4395,8 +4394,10 @@ items({dict}) *items()*
order. order.
jobclose({job}[, {stream}]) {Nvim} *jobclose()* jobclose({job}[, {stream}]) {Nvim} *jobclose()*
Close {job}'s {stream}, which can be one "stdin", "stdout" or Close {job}'s {stream}, which can be one of "stdin", "stdout",
"stderr". If {stream} is omitted, all streams are closed. "stderr" or "rpc" (closes the rpc channel for a job started
with the "rpc" option.) If {stream} is omitted, all streams
are closed.
jobpid({job}) {Nvim} *jobpid()* jobpid({job}) {Nvim} *jobpid()*
Return the pid (process id) of {job}. Return the pid (process id) of {job}.
@@ -4418,6 +4419,10 @@ jobsend({job}, {data}) {Nvim} *jobsend()*
:call jobsend(j, ["abc", "123\n456", ""]) :call jobsend(j, ["abc", "123\n456", ""])
< will send "abc<NL>123<NUL>456<NL>". < will send "abc<NL>123<NUL>456<NL>".
If the job was started with the rpc option this function
cannot be used, instead use |rpcnotify()| and |rpcrequest()|
to communicate with the job.
jobstart({cmd}[, {opts}]) {Nvim} *jobstart()* jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
Spawns {cmd} as a job. If {cmd} is a |List| it is run Spawns {cmd} as a job. If {cmd} is a |List| it is run
directly. If {cmd} is a |String| it is processed like this: > directly. If {cmd} is a |String| it is processed like this: >
@@ -4433,9 +4438,14 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
on_exit : exit event handler (function name or |Funcref|) on_exit : exit event handler (function name or |Funcref|)
cwd : Working directory of the job; defaults to cwd : Working directory of the job; defaults to
|current-directory|. |current-directory|.
rpc : If set, |msgpack-rpc| will be used to communicate
with the job over stdin and stdout. "on_stdout" is
then ignored, but "on_stderr" can still be used.
pty : If set, the job will be connected to a new pseudo pty : If set, the job will be connected to a new pseudo
terminal, and the job streams are connected to terminal, and the job streams are connected to
the master file descriptor. the master file descriptor. "on_stderr" is ignored
as all output will be received on stdout.
width : (pty only) Width of the terminal screen width : (pty only) Width of the terminal screen
height : (pty only) Height of the terminal screen height : (pty only) Height of the terminal screen
TERM : (pty only) $TERM environment variable TERM : (pty only) $TERM environment variable
@@ -4447,10 +4457,12 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
{opts} is passed as |self| to the callback; the caller may {opts} is passed as |self| to the callback; the caller may
pass arbitrary data by setting other keys. pass arbitrary data by setting other keys.
Returns: Returns:
- job ID on success, used by |jobsend()| and |jobstop()| - The job ID on success, which is used by |jobsend()| (or
|rpcnotify()| and |rpcrequest()| if "rpc" option was used)
and |jobstop()|
- 0 on invalid arguments or if the job table is full - 0 on invalid arguments or if the job table is full
- -1 if {cmd}[0] is not executable. - -1 if {cmd}[0] is not executable.
See |job-control| for more information. See |job-control| and |msgpack-rpc| for more information.
jobstop({job}) {Nvim} *jobstop()* jobstop({job}) {Nvim} *jobstop()*
Stop a job created with |jobstart()| by sending a `SIGTERM` Stop a job created with |jobstart()| by sending a `SIGTERM`
@@ -5649,19 +5661,20 @@ rpcrequest({channel}, {method}[, {args}...]) {Nvim} *rpcrequest()*
:let result = rpcrequest(rpc_chan, "func", 1, 2, 3) :let result = rpcrequest(rpc_chan, "func", 1, 2, 3)
rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()* rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()*
Spawns {prog} as a job (optionally passing the list {argv}), Deprecated. Replace >
and opens an |RPC| channel with the spawned process's :let id = rpcstart('prog', ['arg1', 'arg2'])
stdin/stdout. Returns: < with >
- channel id on success, which is used by |rpcrequest()|, :let id = jobstart(['prog', 'arg1', 'arg2'],
|rpcnotify()| and |rpcstop()| {'rpc': v:true})
- 0 on failure
Example: >
:let rpc_chan = rpcstart('prog', ['arg1', 'arg2'])
rpcstop({channel}) {Nvim} *rpcstop()* rpcstop({channel}) {Nvim} *rpcstop()*
Closes an |RPC| {channel}, possibly created via Closes an |RPC| {channel}. If the channel is a job
|rpcstart()|. Also closes channels created by connections to started with |jobstart()| the job is killed.
|v:servername|. It is better to use |jobstop()| in this case, or use
|jobclose|(id, "rpc") to only close the channel without
killing the job.
Closes the socket connection if the channel was opened by
connecting to |v:servername|.
screenattr(row, col) *screenattr()* screenattr(row, col) *screenattr()*
Like screenchar(), but return the attribute. This is a rather Like screenchar(), but return the attribute. This is a rather

View File

@@ -11,7 +11,6 @@ RPC API for Nvim *RPC* *rpc* *msgpack-rpc*
3. Connecting |rpc-connecting| 3. Connecting |rpc-connecting|
4. Clients |rpc-api-client| 4. Clients |rpc-api-client|
5. Types |rpc-types| 5. Types |rpc-types|
6. Vimscript functions |rpc-vim-functions|
============================================================================== ==============================================================================
1. Introduction *rpc-intro* 1. Introduction *rpc-intro*
@@ -66,12 +65,16 @@ To get a formatted dump of the API using python (requires the `pyyaml` and
============================================================================== ==============================================================================
3. Connecting *rpc-connecting* 3. Connecting *rpc-connecting*
There are several ways to open a msgpack-rpc stream to an Nvim server: There are several ways to open a msgpack-rpc channel to an Nvim instance:
1. Through stdin/stdout when `nvim` is started with `--embed`. This is how 1. Through stdin/stdout when `nvim` is started with `--embed`. This is how
applications can embed Nvim. applications can embed Nvim.
2. Through stdin/stdout of some other process spawned by |rpcstart()|. 2. Through stdin/stdout of some other process spawned by |jobstart()|.
Set the "rpc" key to |v:true| in the options dict to use the job's stdin
and stdout as a single msgpack channel that is processed directly by
Nvim. Then it is not possible to process raw data to or from the
process's stdin and stdout. stderr can still be used, though.
3. Through the socket automatically created with each instance. The socket 3. Through the socket automatically created with each instance. The socket
location is stored in |v:servername|. location is stored in |v:servername|.
@@ -110,11 +113,12 @@ functions can be called interactively:
>>> nvim = attach('socket', path='[address]') >>> nvim = attach('socket', path='[address]')
>>> nvim.command('echo "hello world!"') >>> nvim.command('echo "hello world!"')
< <
You can also embed an Nvim instance via |rpcstart()| You can also embed an Nvim instance via |jobstart()|, and communicate using
|rpcrequest()| and |rpcnotify()|:
> >
let vim = rpcstart('nvim', ['--embed']) let vim = jobstart(['nvim', '--embed'], {'rpc': v:true})
echo rpcrequest(vim, 'vim_eval', '"Hello " . "world!"') echo rpcrequest(vim, 'vim_eval', '"Hello " . "world!"')
call rpcstop(vim) call jobstop(vim)
< <
============================================================================== ==============================================================================
4. Implementing API clients *rpc-api-client* *api-client* 4. Implementing API clients *rpc-api-client* *api-client*
@@ -233,23 +237,5 @@ Even for statically compiled clients it is good practice to avoid hardcoding
the type codes, because a client may be built against one Nvim version but the type codes, because a client may be built against one Nvim version but
connect to another with different type codes. connect to another with different type codes.
==============================================================================
6. Vimscript functions *rpc-vim-functions*
RPC functions are available in Vimscript:
1. |rpcstart()|: Similarly to |jobstart()|, this will spawn a co-process
with its standard handles connected to Nvim. The difference is that it's
not possible to process raw data to or from the process's stdin, stdout,
or stderr. This is because the job's stdin and stdout are used as
a single msgpack channel that is processed directly by Nvim.
2. |rpcstop()|: Same as |jobstop()|, but operates on handles returned by
|rpcstart()|.
3. |rpcrequest()|: Sends a msgpack-rpc request to the process.
4. |rpcnotify()|: Sends a msgpack-rpc notification to the process.
|rpcrequest()| and |rpcnotify()| can also be used with channels connected to
a nvim server. |v:servername|
============================================================================== ==============================================================================
vim:tw=78:ts=8:noet:ft=help:norl: vim:tw=78:ts=8:noet:ft=help:norl:

View File

@@ -408,6 +408,7 @@ typedef struct {
Terminal *term; Terminal *term;
bool stopped; bool stopped;
bool exited; bool exited;
bool rpc;
int refcount; int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit; ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self; dict_T *self;
@@ -448,7 +449,6 @@ typedef struct {
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */ #define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with #define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */ valid character */
static uint64_t current_job_id = 1;
static PMap(uint64_t) *jobs = NULL; static PMap(uint64_t) *jobs = NULL;
static uint64_t last_timer_id = 0; static uint64_t last_timer_id = 0;
@@ -11724,18 +11724,37 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
if (argvars[1].v_type == VAR_STRING) { if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string; char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) { if (!strcmp(stream, "stdin")) {
if (data->rpc) {
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
} else {
process_close_in(proc); process_close_in(proc);
}
} else if (!strcmp(stream, "stdout")) { } else if (!strcmp(stream, "stdout")) {
if (data->rpc) {
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
} else {
process_close_out(proc); process_close_out(proc);
}
} else if (!strcmp(stream, "stderr")) { } else if (!strcmp(stream, "stderr")) {
process_close_err(proc); process_close_err(proc);
} else if (!strcmp(stream, "rpc")) {
if (data->rpc) {
channel_close(data->id);
} else {
EMSG(_("Invalid job stream: Not an rpc job"));
}
} else { } else {
EMSG2(_("Invalid job stream \"%s\""), stream); EMSG2(_("Invalid job stream \"%s\""), stream);
} }
} else {
if (data->rpc) {
channel_close(data->id);
process_close_err(proc);
} else { } else {
process_close_streams(proc); process_close_streams(proc);
} }
} }
}
// "jobpid(id)" function // "jobpid(id)" function
static void f_jobpid(typval_T *argvars, typval_T *rettv) static void f_jobpid(typval_T *argvars, typval_T *rettv)
@@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return; return;
} }
if (data->rpc) {
EMSG(_("Can't send raw data to rpc channel"));
return;
}
ssize_t input_len; ssize_t input_len;
char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false); char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false);
if (!input) { if (!input) {
@@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
return; return;
} }
dict_T *job_opts = NULL; dict_T *job_opts = NULL;
bool detach = false, rpc = false, pty = false;
ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL; ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL;
char *cwd = NULL; char *cwd = NULL;
if (argvars[1].v_type == VAR_DICT) { if (argvars[1].v_type == VAR_DICT) {
job_opts = argvars[1].vval.v_dict; job_opts = argvars[1].vval.v_dict;
detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0;
rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0;
pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0;
if (pty && rpc) {
EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set");
shell_free_argv(argv);
return;
}
char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false); char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false);
if (new_cwd && strlen(new_cwd) > 0) { if (new_cwd && strlen(new_cwd) > 0) {
cwd = new_cwd; cwd = new_cwd;
@@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
} }
} }
bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0;
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
job_opts, pty, detach, cwd); job_opts, pty, rpc, detach, cwd);
Process *proc = (Process *)&data->proc; Process *proc = (Process *)&data->proc;
if (pty) { if (pty) {
@@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
} }
} }
if (!on_stdout) { if (!rpc && !on_stdout) {
proc->out = NULL; proc->out = NULL;
} }
if (!on_stderr) { if (!on_stderr) {
@@ -14105,7 +14138,7 @@ end:
api_free_object(result); api_free_object(result);
} }
// "rpcstart()" function // "rpcstart()" function (DEPRECATED)
static void f_rpcstart(typval_T *argvars, typval_T *rettv) static void f_rpcstart(typval_T *argvars, typval_T *rettv)
{ {
rettv->v_type = VAR_NUMBER; rettv->v_type = VAR_NUMBER;
@@ -14158,33 +14191,28 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL // The last item of argv must be NULL
argv[i] = NULL; argv[i] = NULL;
uint64_t channel_id = channel_from_process(argv);
if (!channel_id) { TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL,
EMSG(_(e_api_spawn_failed)); NULL, false, true, false, NULL);
} common_job_start(data, rettv);
rettv->vval.v_number = (varnumber_T)channel_id;
} }
// "rpcstop()" function // "rpcstop()" function
static void f_rpcstop(typval_T *argvars, typval_T *rettv) static void f_rpcstop(typval_T *argvars, typval_T *rettv)
{ {
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER) { if (argvars[0].v_type != VAR_NUMBER) {
// Wrong argument types // Wrong argument types
EMSG(_(e_invarg)); EMSG(_(e_invarg));
return; return;
} }
// if called with a job, stop it, else closes the channel
if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) {
f_jobstop(argvars, rettv);
} else {
rettv->vval.v_number = channel_close(argvars[0].vval.v_number); rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
} }
}
/* /*
* "screenattr()" function * "screenattr()" function
@@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
} }
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
job_opts, true, false, cwd); job_opts, true, false, false, cwd);
data->proc.pty.width = curwin->w_width; data->proc.pty.width = curwin->w_width;
data->proc.pty.height = curwin->w_height; data->proc.pty.height = curwin->w_height;
data->proc.pty.term_name = xstrdup("xterm-256color"); data->proc.pty.term_name = xstrdup("xterm-256color");
@@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv,
ufunc_T *on_exit, ufunc_T *on_exit,
dict_T *self, dict_T *self,
bool pty, bool pty,
bool rpc,
bool detach, bool detach,
char *cwd) char *cwd)
{ {
@@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv,
data->on_exit = on_exit; data->on_exit = on_exit;
data->self = self; data->self = self;
data->events = queue_new_child(main_loop.events); data->events = queue_new_child(main_loop.events);
data->rpc = rpc;
if (pty) { if (pty) {
data->proc.pty = pty_process_init(&main_loop, data); data->proc.pty = pty_process_init(&main_loop, data);
} else { } else {
@@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv,
return data; return data;
} }
/// Return true/false on success/failure. /// common code for getting job callbacks for jobstart, termopen and rpcstart
///
/// @return true/false on success/failure.
static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout, static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
ufunc_T **on_stderr, ufunc_T **on_exit) ufunc_T **on_stderr, ufunc_T **on_exit)
{ {
@@ -22174,12 +22206,19 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
} }
xfree(cmd); xfree(cmd);
data->id = current_job_id++; data->id = next_chan_id++;
if (data->rpc) {
// the rpc channel takes over the in and out streams
channel_from_process(proc, data->id);
} else {
wstream_init(proc->in, 0); wstream_init(proc->in, 0);
if (proc->out) { if (proc->out) {
rstream_init(proc->out, 0); rstream_init(proc->out, 0);
rstream_start(proc->out, on_job_stdout, data); rstream_start(proc->out, on_job_stdout, data);
} }
}
if (proc->err) { if (proc->err) {
rstream_init(proc->err, 0); rstream_init(proc->err, 0);
rstream_start(proc->err, on_job_stderr, data); rstream_start(proc->err, on_job_stderr, data);
@@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d)
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
terminal_close(data->term, msg); terminal_close(data->term, msg);
} }
if (data->rpc) {
channel_process_exit(data->id, status);
}
if (data->status_ptr) { if (data->status_ptr) {
*data->status_ptr = status; *data->status_ptr = status;
} }
process_job_event(data, data->on_exit, "exit", NULL, 0, status); process_job_event(data, data->on_exit, "exit", NULL, 0, status);
pmap_del(uint64_t)(jobs, data->id);
term_job_data_decref(data);
} }
static void term_write(char *buf, size_t size, void *d) static void term_write(char *buf, size_t size, void *d)
@@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data)
static void on_job_event(JobEvent *ev) static void on_job_event(JobEvent *ev)
{ {
if (!ev->callback) { if (!ev->callback) {
goto end; return;
} }
typval_T argv[3]; typval_T argv[3];
@@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev)
call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum, call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum,
curwin->w_cursor.lnum, ev->data->self); curwin->w_cursor.lnum, ev->data->self);
clear_tv(&rettv); clear_tv(&rettv);
end:
if (!ev->received) {
// exit event, safe to free job data now
pmap_del(uint64_t)(jobs, ev->data->id);
term_job_data_decref(ev->data);
}
} }
static TerminalJobData *find_job(uint64_t id) static TerminalJobData *find_job(uint64_t id)

View File

@@ -1244,6 +1244,9 @@ EXTERN char *ignoredp;
// If a msgpack-rpc channel should be started over stdin/stdout // If a msgpack-rpc channel should be started over stdin/stdout
EXTERN bool embedded_mode INIT(= false); EXTERN bool embedded_mode INIT(= false);
/// next free id for a job or rpc channel
EXTERN uint64_t next_chan_id INIT(= 1);
/// Used to track the status of external functions. /// Used to track the status of external functions.
/// Currently only used for iconv(). /// Currently only used for iconv().
typedef enum { typedef enum {

View File

@@ -19,6 +19,7 @@
#include "nvim/main.h" #include "nvim/main.h"
#include "nvim/ascii.h" #include "nvim/ascii.h"
#include "nvim/memory.h" #include "nvim/memory.h"
#include "nvim/eval.h"
#include "nvim/os_unix.h" #include "nvim/os_unix.h"
#include "nvim/message.h" #include "nvim/message.h"
#include "nvim/map.h" #include "nvim/map.h"
@@ -55,12 +56,7 @@ typedef struct {
msgpack_unpacker *unpacker; msgpack_unpacker *unpacker;
union { union {
Stream stream; Stream stream;
struct { Process *proc;
LibuvProcess uvproc;
Stream in;
Stream out;
Stream err;
} process;
struct { struct {
Stream in; Stream in;
Stream out; Stream out;
@@ -79,7 +75,6 @@ typedef struct {
uint64_t request_id; uint64_t request_id;
} RequestEvent; } RequestEvent;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL; static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL; static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer; static msgpack_sbuffer out_buffer;
@@ -112,33 +107,20 @@ void channel_teardown(void)
} }
/// Creates an API channel by starting a process and connecting to its /// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream. /// stdin/stdout. stderr is handled by the job infrastructure.
/// ///
/// @param argv The argument vector for the process. [consumed] /// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success. /// @return The channel id (> 0), on success.
/// 0, on error. /// 0, on error.
uint64_t channel_from_process(char **argv) uint64_t channel_from_process(Process *proc, uint64_t id)
{ {
Channel *channel = register_channel(kChannelTypeProc); Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
Process *proc = &channel->data.process.uvproc.process;
proc->argv = argv;
proc->in = &channel->data.process.in;
proc->out = &channel->data.process.out;
proc->err = &channel->data.process.err;
proc->cb = process_exit;
if (!process_spawn(proc)) {
loop_poll_events(&main_loop, 0);
decref(channel);
return 0;
}
incref(channel); // process channels are only closed by the exit_cb incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
wstream_init(proc->in, 0); wstream_init(proc->in, 0);
rstream_init(proc->out, 0); rstream_init(proc->out, 0);
rstream_start(proc->out, parse_msgpack, channel); rstream_start(proc->out, parse_msgpack, channel);
rstream_init(proc->err, 0);
rstream_start(proc->err, forward_stderr, channel);
return channel->id; return channel->id;
} }
@@ -148,7 +130,7 @@ uint64_t channel_from_process(char **argv)
/// @param watcher The SocketWatcher ready to accept the connection /// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher) void channel_from_connection(SocketWatcher *watcher)
{ {
Channel *channel = register_channel(kChannelTypeSocket); Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
socket_watcher_accept(watcher, &channel->data.stream); socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb; channel->data.stream.internal_close_cb = close_cb;
@@ -314,7 +296,7 @@ bool channel_close(uint64_t id)
/// Neovim /// Neovim
void channel_from_stdio(void) void channel_from_stdio(void)
{ {
Channel *channel = register_channel(kChannelTypeStdio); Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
incref(channel); // stdio channels are only closed on exit incref(channel); // stdio channels are only closed on exit
// read stream // read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
@@ -323,20 +305,12 @@ void channel_from_stdio(void)
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
} }
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, void channel_process_exit(uint64_t id, int status)
void *data, bool eof)
{ {
while (rbuffer_size(rbuf)) { Channel *channel = pmap_get(uint64_t)(channels, id);
char buf[256];
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
static void process_exit(Process *proc, int status, void *data) channel->closed = true;
{ decref(channel);
decref(data);
} }
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
@@ -511,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
success = wstream_write(&channel->data.stream, buffer); success = wstream_write(&channel->data.stream, buffer);
break; break;
case kChannelTypeProc: case kChannelTypeProc:
success = wstream_write(&channel->data.process.in, buffer); success = wstream_write(channel->data.proc->in, buffer);
break; break;
case kChannelTypeStdio: case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer); success = wstream_write(&channel->data.std.out, buffer);
@@ -639,9 +613,10 @@ static void close_channel(Channel *channel)
stream_close(&channel->data.stream, NULL, NULL); stream_close(&channel->data.stream, NULL, NULL);
break; break;
case kChannelTypeProc: case kChannelTypeProc:
if (!channel->data.process.uvproc.process.closed) { // Only close the rpc channel part,
process_stop(&channel->data.process.uvproc.process); // there could be an error message on the stderr stream
} process_close_in(channel->data.proc);
process_close_out(channel->data.proc);
break; break;
case kChannelTypeStdio: case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL, NULL); stream_close(&channel->data.std.in, NULL, NULL);
@@ -679,7 +654,9 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events); pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack); kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications); kv_destroy(channel->delayed_notifications);
if (channel->type != kChannelTypeProc) {
queue_free(channel->events); queue_free(channel->events);
}
xfree(channel); xfree(channel);
} }
@@ -688,15 +665,15 @@ static void close_cb(Stream *stream, void *data)
decref(data); decref(data);
} }
static Channel *register_channel(ChannelType type) static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)
{ {
Channel *rv = xmalloc(sizeof(Channel)); Channel *rv = xmalloc(sizeof(Channel));
rv->events = queue_new_child(main_loop.events); rv->events = events ? events : queue_new_child(main_loop.events);
rv->type = type; rv->type = type;
rv->refcount = 1; rv->refcount = 1;
rv->closed = false; rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = next_id++; rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0; rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)(); rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1; rv->next_request_id = 1;

View File

@@ -6,6 +6,7 @@
#include "nvim/api/private/defs.h" #include "nvim/api/private/defs.h"
#include "nvim/event/socket.h" #include "nvim/event/socket.h"
#include "nvim/event/process.h"
#include "nvim/vim.h" #include "nvim/vim.h"
#define METHOD_MAXLEN 512 #define METHOD_MAXLEN 512

View File

@@ -0,0 +1,38 @@
local deps_prefix = './.deps/usr'
if os.getenv('DEPS_PREFIX') then
deps_prefix = os.getenv('DEPS_PREFIX')
end
package.path = deps_prefix .. '/share/lua/5.1/?.lua;' ..
deps_prefix .. '/share/lua/5.1/?/init.lua;' ..
package.path
package.cpath = deps_prefix .. '/lib/lua/5.1/?.so;' ..
package.cpath
local mpack = require('mpack')
local StdioStream = require('nvim.stdio_stream')
local Session = require('nvim.session')
local stdio_stream = StdioStream.open()
local session = Session.new(stdio_stream)
local function on_request(method, args)
if method == 'poll' then
return 'ok'
elseif method == 'write_stderr' then
io.stderr:write(args[1])
return "done!"
elseif method == "exit" then
session:stop()
return mpack.NIL
end
end
local function on_notification(event, args)
if event == 'ping' and #args == 0 then
session:notify("vim_eval", "rpcnotify(g:channel, 'pong')")
end
end
session:run(on_request, on_notification)

View File

@@ -4,7 +4,9 @@
local helpers = require('test.functional.helpers')(after_each) local helpers = require('test.functional.helpers')(after_each)
local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval
local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop
local nvim_prog = helpers.nvim_prog local nvim_prog, command, funcs = helpers.nvim_prog, helpers.command, helpers.funcs
local source, next_message = helpers.source, helpers.next_message
local meths = helpers.meths
describe('server -> client', function() describe('server -> client', function()
@@ -144,11 +146,11 @@ describe('server -> client', function()
end end
before_each(function() before_each(function()
nvim('command', "let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])") command("let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
neq(0, eval('vim')) neq(0, eval('vim'))
end) end)
after_each(function() nvim('command', 'call rpcstop(vim)') end) after_each(function() command('call rpcstop(vim)') end)
it('can send/recieve notifications and make requests', function() it('can send/recieve notifications and make requests', function()
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')") nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
@@ -181,4 +183,42 @@ describe('server -> client', function()
eq(true, string.match(err, ': (.*)') == 'Failed to evaluate expression') eq(true, string.match(err, ': (.*)') == 'Failed to evaluate expression')
end) end)
end) end)
describe('when using jobstart', function()
local jobid
before_each(function()
local channel = nvim('get_api_info')[1]
nvim('set_var', 'channel', channel)
source([[
function! s:OnEvent(id, data, event)
call rpcnotify(g:channel, a:event, 0, a:data)
endfunction
let g:job_opts = {
\ 'on_stderr': function('s:OnEvent'),
\ 'on_exit': function('s:OnEvent'),
\ 'user': 0,
\ 'rpc': v:true
\ }
]])
local lua_prog = arg[-1]
meths.set_var("args", {lua_prog, 'test/functional/api/rpc_fixture.lua'})
jobid = eval("jobstart(g:args, g:job_opts)")
neq(0, 'jobid')
end)
after_each(function()
funcs.jobstop(jobid)
end)
it('rpc and text stderr can be combined', function()
eq("ok",funcs.rpcrequest(jobid, "poll"))
funcs.rpcnotify(jobid, "ping")
eq({'notification', 'pong', {}}, next_message())
eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message())
funcs.rpcrequest(jobid, "exit")
eq({'notification', 'exit', {0, 0}}, next_message())
end)
end)
end) end)

View File

@@ -5,6 +5,7 @@ local clear, eq, eval, execute, feed, insert, neq, next_msg, nvim,
helpers.insert, helpers.neq, helpers.next_message, helpers.nvim, helpers.insert, helpers.neq, helpers.next_message, helpers.nvim,
helpers.nvim_dir, helpers.ok, helpers.source, helpers.nvim_dir, helpers.ok, helpers.source,
helpers.write_file, helpers.mkdir, helpers.rmdir helpers.write_file, helpers.mkdir, helpers.rmdir
local command = helpers.command
local Screen = require('test.functional.ui.screen') local Screen = require('test.functional.ui.screen')
@@ -429,6 +430,13 @@ describe('jobs', function()
eq({'notification', 'j', {0, {jobid, 'exit'}}}, next_msg()) eq({'notification', 'j', {0, {jobid, 'exit'}}}, next_msg())
end) end)
it('cannot have both rpc and pty options', function()
command("let g:job_opts.pty = v:true")
command("let g:job_opts.rpc = v:true")
local _, err = pcall(command, "let j = jobstart(['cat', '-'], g:job_opts)")
ok(string.find(err, "E475: Invalid argument: job cannot have both 'pty' and 'rpc' options set") ~= nil)
end)
describe('running tty-test program', function() describe('running tty-test program', function()
local function next_chunk() local function next_chunk()
local rv local rv