mirror of
https://github.com/neovim/neovim.git
synced 2025-09-28 14:08:32 +00:00
Merge pull request #6844 from bfredl/channel
channels: support buffered output and bytes sockets/stdio
This commit is contained in:
@@ -252,7 +252,7 @@ static void remote_ui_flush(UI *ui)
|
||||
{
|
||||
UIData *data = ui->data;
|
||||
if (data->buffer.size > 0) {
|
||||
channel_send_event(data->channel_id, "redraw", data->buffer);
|
||||
rpc_send_event(data->channel_id, "redraw", data->buffer);
|
||||
data->buffer = (Array)ARRAY_DICT_INIT;
|
||||
}
|
||||
}
|
||||
|
@@ -721,7 +721,7 @@ void nvim_subscribe(uint64_t channel_id, String event)
|
||||
char e[METHOD_MAXLEN + 1];
|
||||
memcpy(e, event.data, length);
|
||||
e[length] = NUL;
|
||||
channel_subscribe(channel_id, e);
|
||||
rpc_subscribe(channel_id, e);
|
||||
}
|
||||
|
||||
/// Unsubscribes to event broadcasts
|
||||
@@ -737,7 +737,7 @@ void nvim_unsubscribe(uint64_t channel_id, String event)
|
||||
char e[METHOD_MAXLEN + 1];
|
||||
memcpy(e, event.data, length);
|
||||
e[length] = NUL;
|
||||
channel_unsubscribe(channel_id, e);
|
||||
rpc_unsubscribe(channel_id, e);
|
||||
}
|
||||
|
||||
Integer nvim_get_color_by_name(String name)
|
||||
|
@@ -603,6 +603,7 @@ struct file_buffer {
|
||||
char_u *b_p_bt; ///< 'buftype'
|
||||
int b_has_qf_entry; ///< quickfix exists for buffer
|
||||
int b_p_bl; ///< 'buflisted'
|
||||
long b_p_channel; ///< 'channel'
|
||||
int b_p_cin; ///< 'cindent'
|
||||
char_u *b_p_cino; ///< 'cinoptions'
|
||||
char_u *b_p_cink; ///< 'cinkeys'
|
||||
|
752
src/nvim/channel.c
Normal file
752
src/nvim/channel.c
Normal file
@@ -0,0 +1,752 @@
|
||||
// This is an open source non-commercial project. Dear PVS-Studio, please check
|
||||
// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
|
||||
|
||||
#include "nvim/api/ui.h"
|
||||
#include "nvim/channel.h"
|
||||
#include "nvim/eval.h"
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/msgpack_rpc/channel.h"
|
||||
#include "nvim/msgpack_rpc/server.h"
|
||||
#include "nvim/os/shell.h"
|
||||
#include "nvim/path.h"
|
||||
#include "nvim/ascii.h"
|
||||
|
||||
static bool did_stdio = false;
|
||||
PMap(uint64_t) *channels = NULL;
|
||||
|
||||
/// next free id for a job or rpc channel
|
||||
/// 1 is reserved for stdio channel
|
||||
/// 2 is reserved for stderr channel
|
||||
static uint64_t next_chan_id = CHAN_STDERR+1;
|
||||
|
||||
|
||||
typedef struct {
|
||||
Channel *chan;
|
||||
Callback *callback;
|
||||
const char *type;
|
||||
list_T *received;
|
||||
int status;
|
||||
} ChannelEvent;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "channel.c.generated.h"
|
||||
#endif
|
||||
/// Teardown the module
|
||||
void channel_teardown(void)
|
||||
{
|
||||
if (!channels) {
|
||||
return;
|
||||
}
|
||||
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
channel_close(channel->id, kChannelPartAll, NULL);
|
||||
});
|
||||
}
|
||||
|
||||
/// Closes a channel
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @return true if successful, false otherwise
|
||||
bool channel_close(uint64_t id, ChannelPart part, const char **error)
|
||||
{
|
||||
Channel *chan;
|
||||
Process *proc;
|
||||
|
||||
const char *dummy;
|
||||
if (!error) {
|
||||
error = &dummy;
|
||||
}
|
||||
|
||||
if (!(chan = find_channel(id))) {
|
||||
if (id < next_chan_id) {
|
||||
// allow double close, even though we can't say what parts was valid.
|
||||
return true;
|
||||
}
|
||||
*error = (const char *)e_invchan;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool close_main = false;
|
||||
if (part == kChannelPartRpc || part == kChannelPartAll) {
|
||||
close_main = true;
|
||||
if (chan->is_rpc) {
|
||||
rpc_close(chan);
|
||||
} else if (part == kChannelPartRpc) {
|
||||
*error = (const char *)e_invstream;
|
||||
return false;
|
||||
}
|
||||
} else if ((part == kChannelPartStdin || part == kChannelPartStdout)
|
||||
&& chan->is_rpc) {
|
||||
*error = (const char *)e_invstreamrpc;
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (chan->streamtype) {
|
||||
case kChannelStreamSocket:
|
||||
if (!close_main) {
|
||||
*error = (const char *)e_invstream;
|
||||
return false;
|
||||
}
|
||||
stream_may_close(&chan->stream.socket);
|
||||
break;
|
||||
|
||||
case kChannelStreamProc:
|
||||
proc = (Process *)&chan->stream.proc;
|
||||
if (part == kChannelPartStdin || close_main) {
|
||||
stream_may_close(&proc->in);
|
||||
}
|
||||
if (part == kChannelPartStdout || close_main) {
|
||||
stream_may_close(&proc->out);
|
||||
}
|
||||
if (part == kChannelPartStderr || part == kChannelPartAll) {
|
||||
stream_may_close(&proc->err);
|
||||
}
|
||||
if (proc->type == kProcessTypePty && part == kChannelPartAll) {
|
||||
pty_process_close_master(&chan->stream.pty);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case kChannelStreamStdio:
|
||||
if (part == kChannelPartStdin || close_main) {
|
||||
stream_may_close(&chan->stream.stdio.in);
|
||||
}
|
||||
if (part == kChannelPartStdout || close_main) {
|
||||
stream_may_close(&chan->stream.stdio.out);
|
||||
}
|
||||
if (part == kChannelPartStderr) {
|
||||
*error = (const char *)e_invstream;
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case kChannelStreamStderr:
|
||||
if (part != kChannelPartAll && part != kChannelPartStderr) {
|
||||
*error = (const char *)e_invstream;
|
||||
return false;
|
||||
}
|
||||
if (!chan->stream.err.closed) {
|
||||
chan->stream.err.closed = true;
|
||||
// Don't close on exit, in case late error messages
|
||||
if (!exiting) {
|
||||
fclose(stderr);
|
||||
}
|
||||
channel_decref(chan);
|
||||
}
|
||||
break;
|
||||
|
||||
case kChannelStreamInternal:
|
||||
if (!close_main) {
|
||||
*error = (const char *)e_invstream;
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Initializes the module
|
||||
void channel_init(void)
|
||||
{
|
||||
channels = pmap_new(uint64_t)();
|
||||
channel_alloc(kChannelStreamStderr);
|
||||
rpc_init();
|
||||
remote_ui_init();
|
||||
}
|
||||
|
||||
/// Allocates a channel.
|
||||
///
|
||||
/// Channel is allocated with refcount 1, which should be decreased
|
||||
/// when the underlying stream closes.
|
||||
static Channel *channel_alloc(ChannelStreamType type)
|
||||
{
|
||||
Channel *chan = xcalloc(1, sizeof(*chan));
|
||||
if (type == kChannelStreamStdio) {
|
||||
chan->id = CHAN_STDIO;
|
||||
} else if (type == kChannelStreamStderr) {
|
||||
chan->id = CHAN_STDERR;
|
||||
} else {
|
||||
chan->id = next_chan_id++;
|
||||
}
|
||||
chan->events = multiqueue_new_child(main_loop.events);
|
||||
chan->refcount = 1;
|
||||
chan->streamtype = type;
|
||||
pmap_put(uint64_t)(channels, chan->id, chan);
|
||||
return chan;
|
||||
}
|
||||
|
||||
/// Not implemented, only logging for now
|
||||
void channel_create_event(Channel *chan, char *ext_source)
|
||||
{
|
||||
#if MIN_LOG_LEVEL <= INFO_LOG_LEVEL
|
||||
char *stream_desc, *mode_desc, *source;
|
||||
|
||||
switch (chan->streamtype) {
|
||||
case kChannelStreamProc:
|
||||
if (chan->stream.proc.type == kProcessTypePty) {
|
||||
stream_desc = "pty job";
|
||||
} else {
|
||||
stream_desc = "job";
|
||||
}
|
||||
break;
|
||||
|
||||
case kChannelStreamStdio:
|
||||
stream_desc = "stdio";
|
||||
break;
|
||||
|
||||
case kChannelStreamSocket:
|
||||
stream_desc = "socket";
|
||||
break;
|
||||
|
||||
case kChannelStreamInternal:
|
||||
stream_desc = "socket (internal)";
|
||||
break;
|
||||
|
||||
default:
|
||||
stream_desc = "?";
|
||||
}
|
||||
|
||||
if (chan->is_rpc) {
|
||||
mode_desc = ", rpc";
|
||||
} else if (chan->term) {
|
||||
mode_desc = ", terminal";
|
||||
} else {
|
||||
mode_desc = "";
|
||||
}
|
||||
|
||||
if (ext_source) {
|
||||
// TODO(bfredl): in a future improved traceback solution,
|
||||
// external events should be included.
|
||||
source = ext_source;
|
||||
} else {
|
||||
eval_format_source_name_line((char *)IObuff, sizeof(IObuff));
|
||||
source = (char *)IObuff;
|
||||
}
|
||||
|
||||
ILOG("new channel %" PRIu64 " (%s%s): %s", chan->id, stream_desc,
|
||||
mode_desc, source);
|
||||
#else
|
||||
(void)chan;
|
||||
(void)ext_source;
|
||||
#endif
|
||||
}
|
||||
|
||||
void channel_incref(Channel *channel)
|
||||
{
|
||||
channel->refcount++;
|
||||
}
|
||||
|
||||
void channel_decref(Channel *channel)
|
||||
{
|
||||
if (!(--channel->refcount)) {
|
||||
multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel);
|
||||
}
|
||||
}
|
||||
|
||||
void callback_reader_free(CallbackReader *reader)
|
||||
{
|
||||
callback_free(&reader->cb);
|
||||
if (reader->buffered) {
|
||||
ga_clear(&reader->buffer);
|
||||
}
|
||||
}
|
||||
|
||||
void callback_reader_start(CallbackReader *reader)
|
||||
{
|
||||
if (reader->buffered) {
|
||||
ga_init(&reader->buffer, sizeof(char *), 32);
|
||||
ga_grow(&reader->buffer, 32);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_channel_event(void **argv)
|
||||
{
|
||||
Channel *channel = argv[0];
|
||||
if (channel->is_rpc) {
|
||||
rpc_free(channel);
|
||||
}
|
||||
|
||||
callback_reader_free(&channel->on_stdout);
|
||||
callback_reader_free(&channel->on_stderr);
|
||||
callback_free(&channel->on_exit);
|
||||
|
||||
pmap_del(uint64_t)(channels, channel->id);
|
||||
multiqueue_free(channel->events);
|
||||
xfree(channel);
|
||||
}
|
||||
|
||||
static void channel_destroy_early(Channel *chan)
|
||||
{
|
||||
if ((chan->id != --next_chan_id)) {
|
||||
abort();
|
||||
}
|
||||
|
||||
if ((--chan->refcount != 0)) {
|
||||
abort();
|
||||
}
|
||||
|
||||
free_channel_event((void **)&chan);
|
||||
}
|
||||
|
||||
|
||||
static void close_cb(Stream *stream, void *data)
|
||||
{
|
||||
channel_decref(data);
|
||||
}
|
||||
|
||||
Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
||||
CallbackReader on_stderr, Callback on_exit,
|
||||
bool pty, bool rpc, bool detach, const char *cwd,
|
||||
uint16_t pty_width, uint16_t pty_height,
|
||||
char *term_name, varnumber_T *status_out)
|
||||
{
|
||||
Channel *chan = channel_alloc(kChannelStreamProc);
|
||||
chan->on_stdout = on_stdout;
|
||||
chan->on_stderr = on_stderr;
|
||||
chan->on_exit = on_exit;
|
||||
chan->is_rpc = rpc;
|
||||
|
||||
if (pty) {
|
||||
if (detach) {
|
||||
EMSG2(_(e_invarg2), "terminal/pty job cannot be detached");
|
||||
shell_free_argv(argv);
|
||||
xfree(term_name);
|
||||
channel_destroy_early(chan);
|
||||
*status_out = 0;
|
||||
return NULL;
|
||||
}
|
||||
chan->stream.pty = pty_process_init(&main_loop, chan);
|
||||
if (pty_width > 0) {
|
||||
chan->stream.pty.width = pty_width;
|
||||
}
|
||||
if (pty_height > 0) {
|
||||
chan->stream.pty.height = pty_height;
|
||||
}
|
||||
if (term_name) {
|
||||
chan->stream.pty.term_name = term_name;
|
||||
}
|
||||
} else {
|
||||
chan->stream.uv = libuv_process_init(&main_loop, chan);
|
||||
}
|
||||
|
||||
Process *proc = (Process *)&chan->stream.proc;
|
||||
proc->argv = argv;
|
||||
proc->cb = channel_process_exit_cb;
|
||||
proc->events = chan->events;
|
||||
proc->detach = detach;
|
||||
proc->cwd = cwd;
|
||||
|
||||
char *cmd = xstrdup(proc->argv[0]);
|
||||
bool has_out, has_err;
|
||||
if (proc->type == kProcessTypePty) {
|
||||
has_out = true;
|
||||
has_err = false;
|
||||
} else {
|
||||
has_out = chan->is_rpc || callback_reader_set(chan->on_stdout);
|
||||
has_err = callback_reader_set(chan->on_stderr);
|
||||
}
|
||||
int status = process_spawn(proc, true, has_out, has_err);
|
||||
if (status) {
|
||||
EMSG3(_(e_jobspawn), os_strerror(status), cmd);
|
||||
xfree(cmd);
|
||||
if (proc->type == kProcessTypePty) {
|
||||
xfree(chan->stream.pty.term_name);
|
||||
}
|
||||
channel_destroy_early(chan);
|
||||
*status_out = proc->status;
|
||||
return NULL;
|
||||
}
|
||||
xfree(cmd);
|
||||
|
||||
wstream_init(&proc->in, 0);
|
||||
if (has_out) {
|
||||
rstream_init(&proc->out, 0);
|
||||
}
|
||||
|
||||
if (chan->is_rpc) {
|
||||
// the rpc takes over the in and out streams
|
||||
rpc_start(chan);
|
||||
} else {
|
||||
if (has_out) {
|
||||
callback_reader_start(&chan->on_stdout);
|
||||
rstream_start(&proc->out, on_job_stdout, chan);
|
||||
}
|
||||
}
|
||||
|
||||
if (has_err) {
|
||||
callback_reader_start(&chan->on_stderr);
|
||||
rstream_init(&proc->err, 0);
|
||||
rstream_start(&proc->err, on_job_stderr, chan);
|
||||
}
|
||||
|
||||
*status_out = (varnumber_T)chan->id;
|
||||
return chan;
|
||||
}
|
||||
|
||||
|
||||
uint64_t channel_connect(bool tcp, const char *address,
|
||||
bool rpc, CallbackReader on_output,
|
||||
int timeout, const char **error)
|
||||
{
|
||||
if (!tcp && rpc) {
|
||||
char *path = fix_fname(address);
|
||||
if (server_owns_pipe_address(path)) {
|
||||
// avoid deadlock
|
||||
xfree(path);
|
||||
return channel_create_internal_rpc();
|
||||
}
|
||||
xfree(path);
|
||||
}
|
||||
|
||||
Channel *channel = channel_alloc(kChannelStreamSocket);
|
||||
if (!socket_connect(&main_loop, &channel->stream.socket,
|
||||
tcp, address, timeout, error)) {
|
||||
channel_destroy_early(channel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
channel->stream.socket.internal_close_cb = close_cb;
|
||||
channel->stream.socket.internal_data = channel;
|
||||
wstream_init(&channel->stream.socket, 0);
|
||||
rstream_init(&channel->stream.socket, 0);
|
||||
|
||||
if (rpc) {
|
||||
rpc_start(channel);
|
||||
} else {
|
||||
channel->on_stdout = on_output;
|
||||
callback_reader_start(&channel->on_stdout);
|
||||
rstream_start(&channel->stream.socket, on_socket_output, channel);
|
||||
}
|
||||
|
||||
channel_create_event(channel, NULL);
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// Creates an RPC channel from a tcp/pipe socket connection
|
||||
///
|
||||
/// @param watcher The SocketWatcher ready to accept the connection
|
||||
void channel_from_connection(SocketWatcher *watcher)
|
||||
{
|
||||
Channel *channel = channel_alloc(kChannelStreamSocket);
|
||||
socket_watcher_accept(watcher, &channel->stream.socket);
|
||||
channel->stream.socket.internal_close_cb = close_cb;
|
||||
channel->stream.socket.internal_data = channel;
|
||||
wstream_init(&channel->stream.socket, 0);
|
||||
rstream_init(&channel->stream.socket, 0);
|
||||
rpc_start(channel);
|
||||
channel_create_event(channel, watcher->addr);
|
||||
}
|
||||
|
||||
/// Creates a loopback channel. This is used to avoid deadlock
|
||||
/// when an instance connects to its own named pipe.
|
||||
static uint64_t channel_create_internal_rpc(void)
|
||||
{
|
||||
Channel *channel = channel_alloc(kChannelStreamInternal);
|
||||
rpc_start(channel);
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// Creates an API channel from stdin/stdout. This is used when embedding
|
||||
/// Neovim
|
||||
uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
|
||||
const char **error)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (!headless_mode) {
|
||||
*error = _("can only be opened in headless mode");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (did_stdio) {
|
||||
*error = _("channel was already open");
|
||||
return 0;
|
||||
}
|
||||
did_stdio = true;
|
||||
|
||||
Channel *channel = channel_alloc(kChannelStreamStdio);
|
||||
|
||||
rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0);
|
||||
wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0);
|
||||
|
||||
if (rpc) {
|
||||
rpc_start(channel);
|
||||
} else {
|
||||
channel->on_stdout = on_output;
|
||||
callback_reader_start(&channel->on_stdout);
|
||||
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
|
||||
}
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// @param data will be consumed
|
||||
size_t channel_send(uint64_t id, char *data, size_t len, const char **error)
|
||||
{
|
||||
Channel *chan = find_channel(id);
|
||||
if (!chan) {
|
||||
EMSG(_(e_invchan));
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (chan->streamtype == kChannelStreamStderr) {
|
||||
if (chan->stream.err.closed) {
|
||||
*error = _("Can't send data to closed stream");
|
||||
goto err;
|
||||
}
|
||||
// unbuffered write
|
||||
size_t written = fwrite(data, len, 1, stderr);
|
||||
xfree(data);
|
||||
return len * written;
|
||||
}
|
||||
|
||||
|
||||
Stream *in = channel_instream(chan);
|
||||
if (in->closed) {
|
||||
*error = _("Can't send data to closed stream");
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (chan->is_rpc) {
|
||||
*error = _("Can't send raw data to rpc channel");
|
||||
goto err;
|
||||
}
|
||||
|
||||
WBuffer *buf = wstream_new_buffer(data, len, 1, xfree);
|
||||
return wstream_write(in, buf) ? len : 0;
|
||||
|
||||
err:
|
||||
xfree(data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// NB: mutates buf in place!
|
||||
static list_T *buffer_to_tv_list(char *buf, size_t count)
|
||||
{
|
||||
list_T *ret = tv_list_alloc();
|
||||
char *ptr = buf;
|
||||
size_t remaining = count;
|
||||
size_t off = 0;
|
||||
|
||||
while (off < remaining) {
|
||||
// append the line
|
||||
if (ptr[off] == NL) {
|
||||
tv_list_append_string(ret, ptr, (ssize_t)off);
|
||||
size_t skip = off + 1;
|
||||
ptr += skip;
|
||||
remaining -= skip;
|
||||
off = 0;
|
||||
continue;
|
||||
}
|
||||
if (ptr[off] == NUL) {
|
||||
// Translate NUL to NL
|
||||
ptr[off] = NL;
|
||||
}
|
||||
off++;
|
||||
}
|
||||
tv_list_append_string(ret, ptr, (ssize_t)off);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// vimscript job callbacks must be executed on Nvim main loop
|
||||
static inline void process_channel_event(Channel *chan, Callback *callback,
|
||||
const char *type, char *buf,
|
||||
size_t count, int status)
|
||||
{
|
||||
assert(callback);
|
||||
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
|
||||
event_data->received = NULL;
|
||||
if (buf) {
|
||||
event_data->received = buffer_to_tv_list(buf, count);
|
||||
} else {
|
||||
event_data->status = status;
|
||||
}
|
||||
channel_incref(chan); // Hold on ref to callback
|
||||
event_data->chan = chan;
|
||||
event_data->callback = callback;
|
||||
event_data->type = type;
|
||||
|
||||
multiqueue_put(chan->events, on_channel_event, 1, event_data);
|
||||
}
|
||||
|
||||
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
|
||||
}
|
||||
|
||||
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
|
||||
}
|
||||
|
||||
static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
|
||||
}
|
||||
|
||||
static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
|
||||
}
|
||||
|
||||
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
||||
size_t count, bool eof, CallbackReader *reader,
|
||||
const char *type)
|
||||
{
|
||||
// stub variable, to keep reading consistent with the order of events, only
|
||||
// consider the count parameter.
|
||||
size_t r;
|
||||
char *ptr = rbuffer_read_ptr(buf, &r);
|
||||
|
||||
if (eof) {
|
||||
if (reader->buffered) {
|
||||
if (reader->cb.type != kCallbackNone) {
|
||||
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
|
||||
(size_t)reader->buffer.ga_len, 0);
|
||||
ga_clear(&reader->buffer);
|
||||
} else if (reader->self) {
|
||||
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
|
||||
(size_t)reader->buffer.ga_len);
|
||||
tv_dict_add_list(reader->self, type, strlen(type), data);
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
} else if (reader->cb.type != kCallbackNone) {
|
||||
process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// The order here matters, the terminal must receive the data first because
|
||||
// process_channel_event will modify the read buffer(convert NULs into NLs)
|
||||
if (chan->term) {
|
||||
terminal_receive(chan->term, ptr, count);
|
||||
}
|
||||
|
||||
rbuffer_consumed(buf, count);
|
||||
if (reader->buffered) {
|
||||
ga_concat_len(&reader->buffer, ptr, count);
|
||||
} else if (callback_reader_set(*reader)) {
|
||||
process_channel_event(chan, &reader->cb, type, ptr, count, 0);
|
||||
}
|
||||
}
|
||||
|
||||
static void channel_process_exit_cb(Process *proc, int status, void *data)
|
||||
{
|
||||
Channel *chan = data;
|
||||
if (chan->term) {
|
||||
char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN];
|
||||
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
|
||||
terminal_close(chan->term, msg);
|
||||
}
|
||||
|
||||
// if status is -1 the process did not really exit,
|
||||
// we just closed the handle onto a detached process
|
||||
if (status >= 0) {
|
||||
process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
|
||||
}
|
||||
|
||||
channel_decref(chan);
|
||||
}
|
||||
|
||||
static void on_channel_event(void **args)
|
||||
{
|
||||
ChannelEvent *ev = (ChannelEvent *)args[0];
|
||||
|
||||
typval_T argv[4];
|
||||
|
||||
argv[0].v_type = VAR_NUMBER;
|
||||
argv[0].v_lock = VAR_UNLOCKED;
|
||||
argv[0].vval.v_number = (varnumber_T)ev->chan->id;
|
||||
|
||||
if (ev->received) {
|
||||
argv[1].v_type = VAR_LIST;
|
||||
argv[1].v_lock = VAR_UNLOCKED;
|
||||
argv[1].vval.v_list = ev->received;
|
||||
argv[1].vval.v_list->lv_refcount++;
|
||||
} else {
|
||||
argv[1].v_type = VAR_NUMBER;
|
||||
argv[1].v_lock = VAR_UNLOCKED;
|
||||
argv[1].vval.v_number = ev->status;
|
||||
}
|
||||
|
||||
argv[2].v_type = VAR_STRING;
|
||||
argv[2].v_lock = VAR_UNLOCKED;
|
||||
argv[2].vval.v_string = (uint8_t *)ev->type;
|
||||
|
||||
typval_T rettv = TV_INITIAL_VALUE;
|
||||
callback_call(ev->callback, 3, argv, &rettv);
|
||||
tv_clear(&rettv);
|
||||
channel_decref(ev->chan);
|
||||
xfree(ev);
|
||||
}
|
||||
|
||||
|
||||
/// Open terminal for channel
|
||||
///
|
||||
/// Channel `chan` is assumed to be an open pty channel,
|
||||
/// and curbuf is assumed to be a new, unmodified buffer.
|
||||
void channel_terminal_open(Channel *chan)
|
||||
{
|
||||
TerminalOptions topts;
|
||||
topts.data = chan;
|
||||
topts.width = chan->stream.pty.width;
|
||||
topts.height = chan->stream.pty.height;
|
||||
topts.write_cb = term_write;
|
||||
topts.resize_cb = term_resize;
|
||||
topts.close_cb = term_close;
|
||||
curbuf->b_p_channel = (long)chan->id; // 'channel' option
|
||||
Terminal *term = terminal_open(topts);
|
||||
chan->term = term;
|
||||
channel_incref(chan);
|
||||
}
|
||||
|
||||
static void term_write(char *buf, size_t size, void *data)
|
||||
{
|
||||
Channel *chan = data;
|
||||
if (chan->stream.proc.in.closed) {
|
||||
// If the backing stream was closed abruptly, there may be write events
|
||||
// ahead of the terminal close event. Just ignore the writes.
|
||||
ILOG("write failed: stream is closed");
|
||||
return;
|
||||
}
|
||||
WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
|
||||
wstream_write(&chan->stream.proc.in, wbuf);
|
||||
}
|
||||
|
||||
static void term_resize(uint16_t width, uint16_t height, void *data)
|
||||
{
|
||||
Channel *chan = data;
|
||||
pty_process_resize(&chan->stream.pty, width, height);
|
||||
}
|
||||
|
||||
static inline void term_delayed_free(void **argv)
|
||||
{
|
||||
Channel *chan = argv[0];
|
||||
if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) {
|
||||
multiqueue_put(chan->events, term_delayed_free, 1, chan);
|
||||
return;
|
||||
}
|
||||
|
||||
terminal_destroy(chan->term);
|
||||
chan->term = NULL;
|
||||
channel_decref(chan);
|
||||
}
|
||||
|
||||
static void term_close(void *data)
|
||||
{
|
||||
Channel *chan = data;
|
||||
process_stop(&chan->stream.proc);
|
||||
multiqueue_put(chan->events, term_delayed_free, 1, data);
|
||||
}
|
||||
|
134
src/nvim/channel.h
Normal file
134
src/nvim/channel.h
Normal file
@@ -0,0 +1,134 @@
|
||||
#ifndef NVIM_CHANNEL_H
|
||||
#define NVIM_CHANNEL_H
|
||||
|
||||
#include "nvim/main.h"
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/event/process.h"
|
||||
#include "nvim/os/pty_process.h"
|
||||
#include "nvim/event/libuv_process.h"
|
||||
#include "nvim/eval/typval.h"
|
||||
#include "nvim/msgpack_rpc/channel_defs.h"
|
||||
|
||||
#define CHAN_STDIO 1
|
||||
#define CHAN_STDERR 2
|
||||
|
||||
typedef enum {
|
||||
kChannelStreamProc,
|
||||
kChannelStreamSocket,
|
||||
kChannelStreamStdio,
|
||||
kChannelStreamStderr,
|
||||
kChannelStreamInternal
|
||||
} ChannelStreamType;
|
||||
|
||||
typedef enum {
|
||||
kChannelPartStdin,
|
||||
kChannelPartStdout,
|
||||
kChannelPartStderr,
|
||||
kChannelPartRpc,
|
||||
kChannelPartAll
|
||||
} ChannelPart;
|
||||
|
||||
|
||||
typedef struct {
|
||||
Stream in;
|
||||
Stream out;
|
||||
} StdioPair;
|
||||
|
||||
typedef struct {
|
||||
bool closed;
|
||||
} StderrState;
|
||||
|
||||
typedef struct {
|
||||
Callback cb;
|
||||
dict_T *self;
|
||||
garray_T buffer;
|
||||
bool buffered;
|
||||
} CallbackReader;
|
||||
|
||||
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
|
||||
.self = NULL, \
|
||||
.buffer = GA_EMPTY_INIT_VALUE, \
|
||||
.buffered = false })
|
||||
static inline bool callback_reader_set(CallbackReader reader)
|
||||
{
|
||||
return reader.cb.type != kCallbackNone || reader.self;
|
||||
}
|
||||
|
||||
struct Channel {
|
||||
uint64_t id;
|
||||
size_t refcount;
|
||||
MultiQueue *events;
|
||||
|
||||
ChannelStreamType streamtype;
|
||||
union {
|
||||
Process proc;
|
||||
LibuvProcess uv;
|
||||
PtyProcess pty;
|
||||
Stream socket;
|
||||
StdioPair stdio;
|
||||
StderrState err;
|
||||
} stream;
|
||||
|
||||
bool is_rpc;
|
||||
RpcState rpc;
|
||||
Terminal *term;
|
||||
|
||||
CallbackReader on_stdout;
|
||||
CallbackReader on_stderr;
|
||||
Callback on_exit;
|
||||
};
|
||||
|
||||
EXTERN PMap(uint64_t) *channels;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "channel.h.generated.h"
|
||||
#endif
|
||||
|
||||
/// @returns Channel with the id or NULL if not found
|
||||
static inline Channel *find_channel(uint64_t id)
|
||||
{
|
||||
return pmap_get(uint64_t)(channels, id);
|
||||
}
|
||||
|
||||
static inline Stream *channel_instream(Channel *chan)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
switch (chan->streamtype) {
|
||||
case kChannelStreamProc:
|
||||
return &chan->stream.proc.in;
|
||||
|
||||
case kChannelStreamSocket:
|
||||
return &chan->stream.socket;
|
||||
|
||||
case kChannelStreamStdio:
|
||||
return &chan->stream.stdio.out;
|
||||
|
||||
case kChannelStreamInternal:
|
||||
case kChannelStreamStderr:
|
||||
abort();
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
static inline Stream *channel_outstream(Channel *chan)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
switch (chan->streamtype) {
|
||||
case kChannelStreamProc:
|
||||
return &chan->stream.proc.out;
|
||||
|
||||
case kChannelStreamSocket:
|
||||
return &chan->stream.socket;
|
||||
|
||||
case kChannelStreamStdio:
|
||||
return &chan->stream.stdio.in;
|
||||
|
||||
case kChannelStreamInternal:
|
||||
case kChannelStreamStderr:
|
||||
abort();
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
|
||||
#endif // NVIM_CHANNEL_H
|
822
src/nvim/eval.c
822
src/nvim/eval.c
File diff suppressed because it is too large
Load Diff
@@ -7,6 +7,9 @@
|
||||
#include "nvim/eval/typval.h"
|
||||
#include "nvim/profile.h"
|
||||
#include "nvim/garray.h"
|
||||
#include "nvim/event/rstream.h"
|
||||
#include "nvim/event/wstream.h"
|
||||
#include "nvim/channel.h"
|
||||
|
||||
#define COPYID_INC 2
|
||||
#define COPYID_MASK (~0x1)
|
||||
@@ -53,6 +56,7 @@ typedef enum {
|
||||
VV_DYING,
|
||||
VV_EXCEPTION,
|
||||
VV_THROWPOINT,
|
||||
VV_STDERR,
|
||||
VV_REG,
|
||||
VV_CMDBANG,
|
||||
VV_INSERTMODE,
|
||||
|
@@ -55,6 +55,8 @@ return {
|
||||
call={args={2, 3}},
|
||||
ceil={args=1, func="float_op_wrapper", data="&ceil"},
|
||||
changenr={},
|
||||
chanclose={args={1, 2}},
|
||||
chansend={args=2},
|
||||
char2nr={args={1, 2}},
|
||||
cindent={args=1},
|
||||
clearmatches={},
|
||||
@@ -173,10 +175,10 @@ return {
|
||||
islocked={args=1},
|
||||
id={args=1},
|
||||
items={args=1},
|
||||
jobclose={args={1, 2}},
|
||||
jobclose={args={1, 2}, func="f_chanclose"},
|
||||
jobpid={args=1},
|
||||
jobresize={args=3},
|
||||
jobsend={args=2},
|
||||
jobsend={args=2, func="f_chansend"},
|
||||
jobstart={args={1, 2}},
|
||||
jobstop={args=1},
|
||||
jobwait={args={1, 2}},
|
||||
@@ -273,6 +275,7 @@ return {
|
||||
sockconnect={args={2,3}},
|
||||
sort={args={1, 3}},
|
||||
soundfold={args=1},
|
||||
stdioopen={args=1},
|
||||
spellbadword={args={0, 1}},
|
||||
spellsuggest={args={1, 3}},
|
||||
split={args={1, 3}},
|
||||
|
@@ -374,7 +374,7 @@ void tv_list_append_dict(list_T *const list, dict_T *const dict)
|
||||
/// case string is considered to be usual zero-terminated
|
||||
/// string or NULL “empty” string.
|
||||
void tv_list_append_string(list_T *const l, const char *const str,
|
||||
const ptrdiff_t len)
|
||||
const ssize_t len)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
{
|
||||
if (str == NULL) {
|
||||
@@ -824,7 +824,7 @@ void tv_dict_watcher_add(dict_T *const dict, const char *const key_pattern,
|
||||
/// @param[in] cb2 Second callback to check.
|
||||
///
|
||||
/// @return True if they are equal, false otherwise.
|
||||
bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2)
|
||||
bool tv_callback_equal(const Callback *cb1, const Callback *cb2)
|
||||
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT
|
||||
{
|
||||
if (cb1->type != cb2->type) {
|
||||
@@ -843,10 +843,31 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
assert(false);
|
||||
abort();
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Unref/free callback
|
||||
void callback_free(Callback *callback)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
switch (callback->type) {
|
||||
case kCallbackFuncref: {
|
||||
func_unref(callback->data.funcref);
|
||||
xfree(callback->data.funcref);
|
||||
break;
|
||||
}
|
||||
case kCallbackPartial: {
|
||||
partial_unref(callback->data.partial);
|
||||
break;
|
||||
}
|
||||
case kCallbackNone: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
callback->type = kCallbackNone;
|
||||
}
|
||||
|
||||
/// Remove watcher from a dictionary
|
||||
///
|
||||
/// @param dict Dictionary to remove watcher from.
|
||||
|
@@ -46,22 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc)
|
||||
uvproc->uvstdio[2].flags = UV_IGNORE;
|
||||
uvproc->uv.data = proc;
|
||||
|
||||
if (proc->in) {
|
||||
if (!proc->in.closed) {
|
||||
uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
|
||||
uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t,
|
||||
&proc->in->uv.pipe);
|
||||
&proc->in.uv.pipe);
|
||||
}
|
||||
|
||||
if (proc->out) {
|
||||
if (!proc->out.closed) {
|
||||
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
|
||||
uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t,
|
||||
&proc->out->uv.pipe);
|
||||
&proc->out.uv.pipe);
|
||||
}
|
||||
|
||||
if (proc->err) {
|
||||
if (!proc->err.closed) {
|
||||
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
|
||||
uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t,
|
||||
&proc->err->uv.pipe);
|
||||
&proc->err.uv.pipe);
|
||||
}
|
||||
|
||||
int status;
|
||||
|
@@ -25,28 +25,28 @@
|
||||
// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
|
||||
#define KILL_TIMEOUT_MS 2000
|
||||
|
||||
#define CLOSE_PROC_STREAM(proc, stream) \
|
||||
do { \
|
||||
if (proc->stream && !proc->stream->closed) { \
|
||||
stream_close(proc->stream, NULL, NULL); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
static bool process_is_tearing_down = false;
|
||||
|
||||
/// @returns zero on success, or negative error code
|
||||
int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
int process_spawn(Process *proc, bool in, bool out, bool err)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (proc->in) {
|
||||
uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
|
||||
if (in) {
|
||||
uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0);
|
||||
} else {
|
||||
proc->in.closed = true;
|
||||
}
|
||||
|
||||
if (proc->out) {
|
||||
uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
|
||||
if (out) {
|
||||
uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0);
|
||||
} else {
|
||||
proc->out.closed = true;
|
||||
}
|
||||
|
||||
if (proc->err) {
|
||||
uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
|
||||
if (err) {
|
||||
uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0);
|
||||
} else {
|
||||
proc->err.closed = true;
|
||||
}
|
||||
|
||||
int status;
|
||||
@@ -62,14 +62,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
}
|
||||
|
||||
if (status) {
|
||||
if (proc->in) {
|
||||
uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
|
||||
if (in) {
|
||||
uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
|
||||
}
|
||||
if (proc->out) {
|
||||
uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
|
||||
if (out) {
|
||||
uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL);
|
||||
}
|
||||
if (proc->err) {
|
||||
uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
|
||||
if (err) {
|
||||
uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL);
|
||||
}
|
||||
|
||||
if (proc->type == kProcessTypeUv) {
|
||||
@@ -82,30 +82,27 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
return status;
|
||||
}
|
||||
|
||||
if (proc->in) {
|
||||
stream_init(NULL, proc->in, -1,
|
||||
STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe));
|
||||
proc->in->events = proc->events;
|
||||
proc->in->internal_data = proc;
|
||||
proc->in->internal_close_cb = on_process_stream_close;
|
||||
if (in) {
|
||||
stream_init(NULL, &proc->in, -1,
|
||||
STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe));
|
||||
proc->in.internal_data = proc;
|
||||
proc->in.internal_close_cb = on_process_stream_close;
|
||||
proc->refcount++;
|
||||
}
|
||||
|
||||
if (proc->out) {
|
||||
stream_init(NULL, proc->out, -1,
|
||||
STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe));
|
||||
proc->out->events = proc->events;
|
||||
proc->out->internal_data = proc;
|
||||
proc->out->internal_close_cb = on_process_stream_close;
|
||||
if (out) {
|
||||
stream_init(NULL, &proc->out, -1,
|
||||
STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe));
|
||||
proc->out.internal_data = proc;
|
||||
proc->out.internal_close_cb = on_process_stream_close;
|
||||
proc->refcount++;
|
||||
}
|
||||
|
||||
if (proc->err) {
|
||||
stream_init(NULL, proc->err, -1,
|
||||
STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe));
|
||||
proc->err->events = proc->events;
|
||||
proc->err->internal_data = proc;
|
||||
proc->err->internal_close_cb = on_process_stream_close;
|
||||
if (err) {
|
||||
stream_init(NULL, &proc->err, -1,
|
||||
STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe));
|
||||
proc->err.internal_data = proc;
|
||||
proc->err.internal_close_cb = on_process_stream_close;
|
||||
proc->refcount++;
|
||||
}
|
||||
|
||||
@@ -136,27 +133,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
|
||||
pty_process_teardown(loop);
|
||||
}
|
||||
|
||||
// Wrappers around `stream_close` that protect against double-closing.
|
||||
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
process_close_in(proc);
|
||||
process_close_out(proc);
|
||||
process_close_err(proc);
|
||||
}
|
||||
|
||||
void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
CLOSE_PROC_STREAM(proc, in);
|
||||
}
|
||||
|
||||
void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
CLOSE_PROC_STREAM(proc, out);
|
||||
}
|
||||
|
||||
void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
CLOSE_PROC_STREAM(proc, err);
|
||||
stream_may_close(&proc->in);
|
||||
stream_may_close(&proc->out);
|
||||
stream_may_close(&proc->err);
|
||||
}
|
||||
|
||||
/// Synchronously wait for a process to finish
|
||||
@@ -164,16 +145,15 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
/// @param process Process instance
|
||||
/// @param ms Time in milliseconds to wait for the process.
|
||||
/// 0 for no wait. -1 to wait until the process quits.
|
||||
/// @return Exit code of the process.
|
||||
/// @return Exit code of the process. proc->status will have the same value.
|
||||
/// -1 if the timeout expired while the process is still running.
|
||||
/// -2 if the user interruped the wait.
|
||||
int process_wait(Process *proc, int ms, MultiQueue *events)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
{
|
||||
int status = -1; // default
|
||||
bool interrupted = false;
|
||||
if (!proc->refcount) {
|
||||
status = proc->status;
|
||||
int status = proc->status;
|
||||
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
|
||||
return status;
|
||||
}
|
||||
@@ -209,7 +189,9 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
|
||||
if (proc->refcount == 1) {
|
||||
// Job exited, collect status and manually invoke close_cb to free the job
|
||||
// resources
|
||||
status = interrupted ? -2 : proc->status;
|
||||
if (interrupted) {
|
||||
proc->status = -2;
|
||||
}
|
||||
decref(proc);
|
||||
if (events) {
|
||||
// the decref call created an exit event, process it now
|
||||
@@ -219,7 +201,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
|
||||
proc->refcount--;
|
||||
}
|
||||
|
||||
return status;
|
||||
return proc->status;
|
||||
}
|
||||
|
||||
/// Ask a process to terminate and eventually kill if it doesn't respond
|
||||
@@ -233,8 +215,9 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
|
||||
switch (proc->type) {
|
||||
case kProcessTypeUv:
|
||||
// Close the process's stdin. If the process doesn't close its own
|
||||
// stdout/stderr, they will be closed when it exits (voluntarily or not).
|
||||
process_close_in(proc);
|
||||
// stdout/stderr, they will be closed when it exits(possibly due to being
|
||||
// terminated after a timeout)
|
||||
stream_may_close(&proc->in);
|
||||
ILOG("Sending SIGTERM to pid %d", proc->pid);
|
||||
uv_kill(proc->pid, SIGTERM);
|
||||
break;
|
||||
@@ -375,15 +358,15 @@ static void flush_stream(Process *proc, Stream *stream)
|
||||
|
||||
// Poll for data and process the generated events.
|
||||
loop_poll_events(proc->loop, 0);
|
||||
if (proc->events) {
|
||||
multiqueue_process_events(proc->events);
|
||||
if (stream->events) {
|
||||
multiqueue_process_events(stream->events);
|
||||
}
|
||||
|
||||
// Stream can be closed if it is empty.
|
||||
if (num_bytes == stream->num_bytes) {
|
||||
if (stream->read_cb) {
|
||||
if (stream->read_cb && !stream->did_eof) {
|
||||
// Stream callback could miss EOF handling if a child keeps the stream
|
||||
// open.
|
||||
// open. But only send EOF if we haven't already.
|
||||
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
|
||||
}
|
||||
break;
|
||||
@@ -395,8 +378,8 @@ static void process_close_handles(void **argv)
|
||||
{
|
||||
Process *proc = argv[0];
|
||||
|
||||
flush_stream(proc, proc->out);
|
||||
flush_stream(proc, proc->err);
|
||||
flush_stream(proc, &proc->out);
|
||||
flush_stream(proc, &proc->err);
|
||||
|
||||
process_close_streams(proc);
|
||||
process_close(proc);
|
||||
|
@@ -23,13 +23,14 @@ struct process {
|
||||
uint64_t stopped_time;
|
||||
const char *cwd;
|
||||
char **argv;
|
||||
Stream *in, *out, *err;
|
||||
Stream in, out, err;
|
||||
process_exit_cb cb;
|
||||
internal_process_cb internal_exit_cb, internal_close_cb;
|
||||
bool closed, detach;
|
||||
MultiQueue *events;
|
||||
};
|
||||
|
||||
|
||||
static inline Process process_init(Loop *loop, ProcessType type, void *data)
|
||||
{
|
||||
return (Process) {
|
||||
@@ -38,14 +39,14 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
|
||||
.loop = loop,
|
||||
.events = NULL,
|
||||
.pid = 0,
|
||||
.status = 0,
|
||||
.status = -1,
|
||||
.refcount = 0,
|
||||
.stopped_time = 0,
|
||||
.cwd = NULL,
|
||||
.argv = NULL,
|
||||
.in = NULL,
|
||||
.out = NULL,
|
||||
.err = NULL,
|
||||
.in = { .closed = false },
|
||||
.out = { .closed = false },
|
||||
.err = { .closed = false },
|
||||
.cb = NULL,
|
||||
.closed = false,
|
||||
.internal_close_cb = NULL,
|
||||
@@ -54,6 +55,11 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
|
||||
};
|
||||
}
|
||||
|
||||
static inline bool process_is_stopped(Process *proc)
|
||||
{
|
||||
return proc->stopped_time != 0;
|
||||
}
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "event/process.h.generated.h"
|
||||
#endif
|
||||
|
@@ -105,20 +105,20 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
|
||||
{
|
||||
Stream *stream = uvstream->data;
|
||||
|
||||
if (cnt > 0) {
|
||||
stream->num_bytes += (size_t)cnt;
|
||||
}
|
||||
|
||||
if (cnt <= 0) {
|
||||
if (cnt != UV_ENOBUFS
|
||||
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
|
||||
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
|
||||
//
|
||||
// We don't need to do anything with the RBuffer because the next call
|
||||
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
|
||||
// won't be called)
|
||||
&& cnt != 0) {
|
||||
DLOG("closing Stream: %p: %s (%s)", stream,
|
||||
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
|
||||
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
|
||||
//
|
||||
// We don't need to do anything with the RBuffer because the next call
|
||||
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
|
||||
// won't be called)
|
||||
if (cnt == UV_ENOBUFS || cnt == 0) {
|
||||
return;
|
||||
} else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
|
||||
// The TTY driver might signal TTY without closing the stream
|
||||
invoke_read_cb(stream, 0, true);
|
||||
} else {
|
||||
DLOG("Closing Stream (%p): %s (%s)", stream,
|
||||
uv_err_name((int)cnt), os_strerror((int)cnt));
|
||||
// Read error or EOF, either way stop the stream and invoke the callback
|
||||
// with eof == true
|
||||
@@ -130,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
|
||||
|
||||
// at this point we're sure that cnt is positive, no error occurred
|
||||
size_t nread = (size_t)cnt;
|
||||
stream->num_bytes += nread;
|
||||
// Data was already written, so all we need is to update 'wpos' to reflect
|
||||
// the space actually used in the buffer.
|
||||
rbuffer_produced(stream->buffer, nread);
|
||||
@@ -187,6 +188,7 @@ static void read_event(void **argv)
|
||||
if (stream->read_cb) {
|
||||
size_t count = (uintptr_t)argv[1];
|
||||
bool eof = (uintptr_t)argv[2];
|
||||
stream->did_eof = eof;
|
||||
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
|
||||
}
|
||||
stream->pending_reqs--;
|
||||
|
@@ -92,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
|
||||
}
|
||||
}
|
||||
|
||||
void stream_may_close(Stream *stream)
|
||||
{
|
||||
if (!stream->closed) {
|
||||
stream_close(stream, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void stream_close_handle(Stream *stream)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
|
@@ -14,10 +14,7 @@ typedef struct stream Stream;
|
||||
///
|
||||
/// @param stream The Stream instance
|
||||
/// @param rbuffer The associated RBuffer instance
|
||||
/// @param count Number of bytes to read. This must be respected if keeping
|
||||
/// the order of events is a requirement. This is because events
|
||||
/// may be queued and only processed later when more data is copied
|
||||
/// into to the buffer, so one read may starve another.
|
||||
/// @param count Number of bytes that was read.
|
||||
/// @param data User-defined data
|
||||
/// @param eof If the stream reached EOF.
|
||||
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
|
||||
@@ -33,6 +30,8 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
|
||||
typedef void (*stream_close_cb)(Stream *stream, void *data);
|
||||
|
||||
struct stream {
|
||||
bool closed;
|
||||
bool did_eof;
|
||||
union {
|
||||
uv_pipe_t pipe;
|
||||
uv_tcp_t tcp;
|
||||
@@ -52,7 +51,6 @@ struct stream {
|
||||
size_t maxmem;
|
||||
size_t pending_reqs;
|
||||
size_t num_bytes;
|
||||
bool closed;
|
||||
MultiQueue *events;
|
||||
};
|
||||
|
||||
|
@@ -1074,11 +1074,17 @@ EXTERN char_u e_invexpr2[] INIT(= N_("E15: Invalid expression: %s"));
|
||||
EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range"));
|
||||
EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command"));
|
||||
EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory"));
|
||||
EXTERN char_u e_invjob[] INIT(= N_("E900: Invalid job id"));
|
||||
EXTERN char_u e_invchan[] INIT(= N_("E900: Invalid channel id"));
|
||||
EXTERN char_u e_invchanjob[] INIT(= N_("E900: Invalid channel id: not a job"));
|
||||
EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full"));
|
||||
EXTERN char_u e_jobspawn[] INIT(= N_(
|
||||
"E903: Process failed to start: %s: \"%s\""));
|
||||
EXTERN char_u e_jobnotpty[] INIT(= N_("E904: Job is not connected to a pty"));
|
||||
"E903: Process failed to start: %s: \"%s\""));
|
||||
EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty"));
|
||||
EXTERN char_u e_stdiochan2[] INIT(= N_(
|
||||
"E905: Couldn't open stdio channel: %s"));
|
||||
EXTERN char_u e_invstream[] INIT(= N_("E906: invalid stream for channel"));
|
||||
EXTERN char_u e_invstreamrpc[] INIT(= N_(
|
||||
"E906: invalid stream for rpc channel, use 'rpc'"));
|
||||
EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\""));
|
||||
EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s"));
|
||||
EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number"));
|
||||
@@ -1189,9 +1195,9 @@ EXTERN char *ignoredp;
|
||||
|
||||
// If a msgpack-rpc channel should be started over stdin/stdout
|
||||
EXTERN bool embedded_mode INIT(= false);
|
||||
|
||||
/// next free id for a job or rpc channel
|
||||
EXTERN uint64_t next_chan_id INIT(= 1);
|
||||
// Dont try to start an user interface
|
||||
// or read/write to stdio (unless embedding)
|
||||
EXTERN bool headless_mode INIT(= false);
|
||||
|
||||
/// Used to track the status of external functions.
|
||||
/// Currently only used for iconv().
|
||||
|
@@ -778,7 +778,6 @@ err_closing:
|
||||
if (execl("/bin/sh", "sh", "-c", cmd, (char *)NULL) == -1)
|
||||
PERROR(_("cs_create_connection exec failed"));
|
||||
|
||||
stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
|
||||
exit(127);
|
||||
/* NOTREACHED */
|
||||
default: /* parent. */
|
||||
|
@@ -73,6 +73,9 @@
|
||||
#include "nvim/api/private/helpers.h"
|
||||
#include "nvim/api/private/handle.h"
|
||||
#include "nvim/api/private/dispatch.h"
|
||||
#ifndef WIN32
|
||||
# include "nvim/os/pty_process_unix.h"
|
||||
#endif
|
||||
|
||||
/* Maximum number of commands from + or -c arguments. */
|
||||
#define MAX_ARG_CMDS 10
|
||||
@@ -103,7 +106,6 @@ typedef struct {
|
||||
bool input_isatty; // stdin is a terminal
|
||||
bool output_isatty; // stdout is a terminal
|
||||
bool err_isatty; // stderr is a terminal
|
||||
bool headless; // Do not start the builtin UI.
|
||||
int no_swap_file; // "-n" argument used
|
||||
int use_debug_break_level;
|
||||
int window_count; /* number of windows to use */
|
||||
@@ -298,8 +300,8 @@ int main(int argc, char **argv)
|
||||
assert(p_ch >= 0 && Rows >= p_ch && Rows - p_ch <= INT_MAX);
|
||||
cmdline_row = (int)(Rows - p_ch);
|
||||
msg_row = cmdline_row;
|
||||
screenalloc(false); /* allocate screen buffers */
|
||||
set_init_2(params.headless);
|
||||
screenalloc(false); // allocate screen buffers
|
||||
set_init_2(headless_mode);
|
||||
TIME_MSG("inits 2");
|
||||
|
||||
msg_scroll = TRUE;
|
||||
@@ -311,8 +313,9 @@ int main(int argc, char **argv)
|
||||
/* Set the break level after the terminal is initialized. */
|
||||
debug_break_level = params.use_debug_break_level;
|
||||
|
||||
bool reading_input = !params.headless && (params.input_isatty
|
||||
|| params.output_isatty || params.err_isatty);
|
||||
bool reading_input = !headless_mode
|
||||
&& (params.input_isatty || params.output_isatty
|
||||
|| params.err_isatty);
|
||||
|
||||
if (reading_input) {
|
||||
// One of the startup commands (arguments, sourced scripts or plugins) may
|
||||
@@ -448,7 +451,7 @@ int main(int argc, char **argv)
|
||||
wait_return(TRUE);
|
||||
}
|
||||
|
||||
if (!params.headless) {
|
||||
if (!headless_mode) {
|
||||
// Stop reading from input stream, the UI layer will take over now.
|
||||
input_stop();
|
||||
ui_builtin_start();
|
||||
@@ -809,11 +812,14 @@ static void command_line_scan(mparm_T *parmp)
|
||||
}
|
||||
mch_exit(0);
|
||||
} else if (STRICMP(argv[0] + argv_idx, "headless") == 0) {
|
||||
parmp->headless = true;
|
||||
headless_mode = true;
|
||||
} else if (STRICMP(argv[0] + argv_idx, "embed") == 0) {
|
||||
embedded_mode = true;
|
||||
parmp->headless = true;
|
||||
channel_from_stdio();
|
||||
headless_mode = true;
|
||||
const char *err;
|
||||
if (!channel_from_stdio(true, CALLBACK_READER_INIT, &err)) {
|
||||
abort();
|
||||
}
|
||||
} else if (STRNICMP(argv[0] + argv_idx, "literal", 7) == 0) {
|
||||
#if !defined(UNIX)
|
||||
parmp->literal = TRUE;
|
||||
@@ -1216,7 +1222,6 @@ static void init_params(mparm_T *paramp, int argc, char **argv)
|
||||
memset(paramp, 0, sizeof(*paramp));
|
||||
paramp->argc = argc;
|
||||
paramp->argv = argv;
|
||||
paramp->headless = false;
|
||||
paramp->want_full_screen = true;
|
||||
paramp->use_debug_break_level = -1;
|
||||
paramp->window_count = -1;
|
||||
@@ -1245,6 +1250,14 @@ static void check_and_set_isatty(mparm_T *paramp)
|
||||
stdout_isatty
|
||||
= paramp->output_isatty = os_isatty(fileno(stdout));
|
||||
paramp->err_isatty = os_isatty(fileno(stderr));
|
||||
int tty_fd = paramp->input_isatty
|
||||
? OS_STDIN_FILENO
|
||||
: (paramp->output_isatty
|
||||
? OS_STDOUT_FILENO
|
||||
: (paramp->err_isatty ? OS_STDERR_FILENO : -1));
|
||||
#ifndef WIN32
|
||||
pty_process_save_termios(tty_fd);
|
||||
#endif
|
||||
TIME_MSG("window checked");
|
||||
}
|
||||
|
||||
@@ -1387,7 +1400,7 @@ static void handle_tag(char_u *tagname)
|
||||
// When starting in Ex mode and commands come from a file, set Silent mode.
|
||||
static void check_tty(mparm_T *parmp)
|
||||
{
|
||||
if (parmp->headless) {
|
||||
if (headless_mode) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@@ -2622,7 +2622,10 @@ void preserve_exit(void)
|
||||
|
||||
// Prevent repeated calls into this method.
|
||||
if (really_exiting) {
|
||||
stream_set_blocking(input_global_fd(), true); //normalize stream (#2598)
|
||||
if (input_global_fd() >= 0) {
|
||||
// normalize stream (#2598)
|
||||
stream_set_blocking(input_global_fd(), true);
|
||||
}
|
||||
exit(2);
|
||||
}
|
||||
|
||||
|
@@ -11,8 +11,8 @@
|
||||
#include "nvim/api/private/helpers.h"
|
||||
#include "nvim/api/vim.h"
|
||||
#include "nvim/api/ui.h"
|
||||
#include "nvim/channel.h"
|
||||
#include "nvim/msgpack_rpc/channel.h"
|
||||
#include "nvim/msgpack_rpc/server.h"
|
||||
#include "nvim/event/loop.h"
|
||||
#include "nvim/event/libuv_process.h"
|
||||
#include "nvim/event/rstream.h"
|
||||
@@ -29,58 +29,14 @@
|
||||
#include "nvim/map.h"
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/misc1.h"
|
||||
#include "nvim/path.h"
|
||||
#include "nvim/lib/kvec.h"
|
||||
#include "nvim/os/input.h"
|
||||
|
||||
#define CHANNEL_BUFFER_SIZE 0xffff
|
||||
|
||||
#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
|
||||
#define log_client_msg(...)
|
||||
#define log_server_msg(...)
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
kChannelTypeSocket,
|
||||
kChannelTypeProc,
|
||||
kChannelTypeStdio,
|
||||
kChannelTypeInternal
|
||||
} ChannelType;
|
||||
|
||||
typedef struct {
|
||||
uint64_t request_id;
|
||||
bool returned, errored;
|
||||
Object result;
|
||||
} ChannelCallFrame;
|
||||
|
||||
typedef struct {
|
||||
uint64_t id;
|
||||
size_t refcount;
|
||||
PMap(cstr_t) *subscribed_events;
|
||||
bool closed;
|
||||
ChannelType type;
|
||||
msgpack_unpacker *unpacker;
|
||||
union {
|
||||
Stream stream; // bidirectional (socket)
|
||||
Process *proc;
|
||||
struct {
|
||||
Stream in;
|
||||
Stream out;
|
||||
} std;
|
||||
} data;
|
||||
uint64_t next_request_id;
|
||||
kvec_t(ChannelCallFrame *) call_stack;
|
||||
MultiQueue *events;
|
||||
} Channel;
|
||||
|
||||
typedef struct {
|
||||
Channel *channel;
|
||||
MsgpackRpcRequestHandler handler;
|
||||
Array args;
|
||||
uint64_t request_id;
|
||||
} RequestEvent;
|
||||
|
||||
static PMap(uint64_t) *channels = NULL;
|
||||
static PMap(cstr_t) *event_strings = NULL;
|
||||
static msgpack_sbuffer out_buffer;
|
||||
|
||||
@@ -88,102 +44,44 @@ static msgpack_sbuffer out_buffer;
|
||||
# include "msgpack_rpc/channel.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// Initializes the module
|
||||
void channel_init(void)
|
||||
void rpc_init(void)
|
||||
{
|
||||
ch_before_blocking_events = multiqueue_new_child(main_loop.events);
|
||||
channels = pmap_new(uint64_t)();
|
||||
event_strings = pmap_new(cstr_t)();
|
||||
msgpack_sbuffer_init(&out_buffer);
|
||||
remote_ui_init();
|
||||
}
|
||||
|
||||
/// Teardown the module
|
||||
void channel_teardown(void)
|
||||
|
||||
void rpc_start(Channel *channel)
|
||||
{
|
||||
if (!channels) {
|
||||
return;
|
||||
channel_incref(channel);
|
||||
channel->is_rpc = true;
|
||||
RpcState *rpc = &channel->rpc;
|
||||
rpc->closed = false;
|
||||
rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||
rpc->subscribed_events = pmap_new(cstr_t)();
|
||||
rpc->next_request_id = 1;
|
||||
kv_init(rpc->call_stack);
|
||||
|
||||
if (channel->streamtype != kChannelStreamInternal) {
|
||||
Stream *out = channel_outstream(channel);
|
||||
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
|
||||
Stream *in = channel_instream(channel);
|
||||
DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out);
|
||||
#endif
|
||||
|
||||
rstream_start(out, receive_msgpack, channel);
|
||||
}
|
||||
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
close_channel(channel);
|
||||
});
|
||||
}
|
||||
|
||||
/// Creates an API channel by starting a process and connecting to its
|
||||
/// stdin/stdout. stderr is handled by the job infrastructure.
|
||||
///
|
||||
/// @param proc process object
|
||||
/// @param id (optional) channel id
|
||||
/// @param source description of source function, rplugin name, TCP addr, etc
|
||||
///
|
||||
/// @return Channel id (> 0), on success. 0, on error.
|
||||
uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
|
||||
|
||||
static Channel *find_rpc_channel(uint64_t id)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeProc, id, proc->events,
|
||||
source);
|
||||
incref(channel); // process channels are only closed by the exit_cb
|
||||
channel->data.proc = proc;
|
||||
|
||||
wstream_init(proc->in, 0);
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, receive_msgpack, channel);
|
||||
|
||||
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
|
||||
proc->out);
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// Creates an API channel from a tcp/pipe socket connection
|
||||
///
|
||||
/// @param watcher The SocketWatcher ready to accept the connection
|
||||
void channel_from_connection(SocketWatcher *watcher)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL,
|
||||
watcher->addr);
|
||||
socket_watcher_accept(watcher, &channel->data.stream);
|
||||
incref(channel); // close channel only after the stream is closed
|
||||
channel->data.stream.internal_close_cb = close_cb;
|
||||
channel->data.stream.internal_data = channel;
|
||||
wstream_init(&channel->data.stream, 0);
|
||||
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.stream, receive_msgpack, channel);
|
||||
|
||||
DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
|
||||
&channel->data.stream);
|
||||
}
|
||||
|
||||
/// @param source description of source function, rplugin name, TCP addr, etc
|
||||
uint64_t channel_connect(bool tcp, const char *address, int timeout,
|
||||
char *source, const char **error)
|
||||
{
|
||||
if (!tcp) {
|
||||
char *path = fix_fname(address);
|
||||
if (server_owns_pipe_address(path)) {
|
||||
// avoid deadlock
|
||||
xfree(path);
|
||||
return channel_create_internal();
|
||||
}
|
||||
xfree(path);
|
||||
Channel *chan = find_channel(id);
|
||||
if (!chan || !chan->is_rpc || chan->rpc.closed) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source);
|
||||
if (!socket_connect(&main_loop, &channel->data.stream,
|
||||
tcp, address, timeout, error)) {
|
||||
decref(channel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
incref(channel); // close channel only after the stream is closed
|
||||
channel->data.stream.internal_close_cb = close_cb;
|
||||
channel->data.stream.internal_data = channel;
|
||||
wstream_init(&channel->data.stream, 0);
|
||||
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.stream, receive_msgpack, channel);
|
||||
return channel->id;
|
||||
return chan;
|
||||
}
|
||||
|
||||
/// Publishes an event to a channel.
|
||||
@@ -192,12 +90,11 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout,
|
||||
/// @param name Event name (application-defined)
|
||||
/// @param args Array of event arguments
|
||||
/// @return True if the event was sent successfully, false otherwise.
|
||||
bool channel_send_event(uint64_t id, const char *name, Array args)
|
||||
bool rpc_send_event(uint64_t id, const char *name, Array args)
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (id && (!(channel = pmap_get(uint64_t)(channels, id))
|
||||
|| channel->closed)) {
|
||||
if (id && (!(channel = find_rpc_channel(id)))) {
|
||||
api_free_array(args);
|
||||
return false;
|
||||
}
|
||||
@@ -218,29 +115,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
|
||||
/// @param args Array with method arguments
|
||||
/// @param[out] error True if the return value is an error
|
||||
/// @return Whatever the remote method returned
|
||||
Object channel_send_call(uint64_t id,
|
||||
const char *method_name,
|
||||
Array args,
|
||||
Error *err)
|
||||
Object rpc_send_call(uint64_t id,
|
||||
const char *method_name,
|
||||
Array args,
|
||||
Error *err)
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
|
||||
api_free_array(args);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
incref(channel);
|
||||
uint64_t request_id = channel->next_request_id++;
|
||||
channel_incref(channel);
|
||||
RpcState *rpc = &channel->rpc;
|
||||
uint64_t request_id = rpc->next_request_id++;
|
||||
// Send the msgpack-rpc request
|
||||
send_request(channel, request_id, method_name, args);
|
||||
|
||||
// Push the frame
|
||||
ChannelCallFrame frame = { request_id, false, false, NIL };
|
||||
kv_push(channel->call_stack, &frame);
|
||||
kv_push(rpc->call_stack, &frame);
|
||||
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
|
||||
(void)kv_pop(channel->call_stack);
|
||||
(void)kv_pop(rpc->call_stack);
|
||||
|
||||
if (frame.errored) {
|
||||
if (frame.result.type == kObjectTypeString) {
|
||||
@@ -265,7 +163,7 @@ Object channel_send_call(uint64_t id,
|
||||
api_free_object(frame.result);
|
||||
}
|
||||
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
|
||||
return frame.errored ? NIL : frame.result;
|
||||
}
|
||||
@@ -274,11 +172,11 @@ Object channel_send_call(uint64_t id,
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void channel_subscribe(uint64_t id, char *event)
|
||||
void rpc_subscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
abort();
|
||||
}
|
||||
|
||||
@@ -289,81 +187,32 @@ void channel_subscribe(uint64_t id, char *event)
|
||||
pmap_put(cstr_t)(event_strings, event_string, event_string);
|
||||
}
|
||||
|
||||
pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
|
||||
pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
|
||||
}
|
||||
|
||||
/// Unsubscribes to event broadcasts
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void channel_unsubscribe(uint64_t id, char *event)
|
||||
void rpc_unsubscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
abort();
|
||||
}
|
||||
|
||||
unsubscribe(channel, event);
|
||||
}
|
||||
|
||||
/// Closes a channel
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @return true if successful, false otherwise
|
||||
bool channel_close(uint64_t id)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
close_channel(channel);
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Creates an API channel from stdin/stdout. Used to embed Nvim.
|
||||
void channel_from_stdio(void)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL);
|
||||
incref(channel); // stdio channels are only closed on exit
|
||||
// read stream
|
||||
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.std.in, receive_msgpack, channel);
|
||||
// write stream
|
||||
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
|
||||
|
||||
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
|
||||
&channel->data.std.in, &channel->data.std.out);
|
||||
}
|
||||
|
||||
/// Creates a loopback channel. This is used to avoid deadlock
|
||||
/// when an instance connects to its own named pipe.
|
||||
uint64_t channel_create_internal(void)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL);
|
||||
incref(channel); // internal channel lives until process exit
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
void channel_process_exit(uint64_t id, int status)
|
||||
{
|
||||
Channel *channel = pmap_get(uint64_t)(channels, id);
|
||||
|
||||
channel->closed = true;
|
||||
decref(channel);
|
||||
}
|
||||
|
||||
// rstream.c:read_event() invokes this as stream->read_cb().
|
||||
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *channel = data;
|
||||
incref(channel);
|
||||
channel_incref(channel);
|
||||
|
||||
if (eof) {
|
||||
close_channel(channel);
|
||||
channel_close(channel->id, kChannelPartRpc, NULL);
|
||||
char buf[256];
|
||||
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
|
||||
channel->id);
|
||||
@@ -371,30 +220,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
|
||||
goto end;
|
||||
}
|
||||
|
||||
if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
|
||||
|| (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
|
||||
char buf[256];
|
||||
snprintf(buf, sizeof(buf),
|
||||
"ch %" PRIu64 ": stream closed unexpectedly. "
|
||||
"closing channel",
|
||||
channel->id);
|
||||
call_set_error(channel, buf, WARN_LOG_LEVEL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
size_t count = rbuffer_size(rbuf);
|
||||
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
|
||||
DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
|
||||
channel->id, count, stream);
|
||||
|
||||
// Feed the unpacker with data
|
||||
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
|
||||
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
|
||||
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
|
||||
msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
|
||||
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
|
||||
msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
|
||||
|
||||
parse_msgpack(channel);
|
||||
|
||||
end:
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
}
|
||||
|
||||
static void parse_msgpack(Channel *channel)
|
||||
@@ -404,8 +242,8 @@ static void parse_msgpack(Channel *channel)
|
||||
msgpack_unpack_return result;
|
||||
|
||||
// Deserialize everything we can.
|
||||
while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
|
||||
MSGPACK_UNPACK_SUCCESS) {
|
||||
while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
|
||||
MSGPACK_UNPACK_SUCCESS) {
|
||||
bool is_response = is_rpc_response(&unpacked.data);
|
||||
log_client_msg(channel->id, !is_response, unpacked.data);
|
||||
|
||||
@@ -431,7 +269,7 @@ static void parse_msgpack(Channel *channel)
|
||||
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
|
||||
mch_errmsg(e_outofmem);
|
||||
mch_errmsg("\n");
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
preserve_exit();
|
||||
}
|
||||
|
||||
@@ -496,7 +334,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
|
||||
evdata->handler = handler;
|
||||
evdata->args = args;
|
||||
evdata->request_id = request_id;
|
||||
incref(channel);
|
||||
channel_incref(channel);
|
||||
if (handler.async) {
|
||||
bool is_get_mode = handler.fn == handle_nvim_get_mode;
|
||||
|
||||
@@ -534,66 +372,30 @@ static void on_request_event(void **argv)
|
||||
api_free_object(result);
|
||||
}
|
||||
api_free_array(args);
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
xfree(e);
|
||||
api_clear_error(&error);
|
||||
}
|
||||
|
||||
/// Returns the Stream that a Channel writes to.
|
||||
static Stream *chan_wstream(Channel *chan)
|
||||
{
|
||||
switch (chan->type) {
|
||||
case kChannelTypeSocket:
|
||||
return &chan->data.stream;
|
||||
case kChannelTypeProc:
|
||||
return chan->data.proc->in;
|
||||
case kChannelTypeStdio:
|
||||
return &chan->data.std.out;
|
||||
case kChannelTypeInternal:
|
||||
return NULL;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
/// Returns the Stream that a Channel reads from.
|
||||
static Stream *chan_rstream(Channel *chan)
|
||||
{
|
||||
switch (chan->type) {
|
||||
case kChannelTypeSocket:
|
||||
return &chan->data.stream;
|
||||
case kChannelTypeProc:
|
||||
return chan->data.proc->out;
|
||||
case kChannelTypeStdio:
|
||||
return &chan->data.std.in;
|
||||
case kChannelTypeInternal:
|
||||
return NULL;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
|
||||
static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
{
|
||||
bool success = false;
|
||||
bool success;
|
||||
|
||||
if (channel->closed) {
|
||||
if (channel->rpc.closed) {
|
||||
wstream_release_wbuffer(buffer);
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (channel->type) {
|
||||
case kChannelTypeSocket:
|
||||
case kChannelTypeProc:
|
||||
case kChannelTypeStdio:
|
||||
success = wstream_write(chan_wstream(channel), buffer);
|
||||
break;
|
||||
case kChannelTypeInternal:
|
||||
incref(channel);
|
||||
CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
|
||||
success = true;
|
||||
break;
|
||||
if (channel->streamtype == kChannelStreamInternal) {
|
||||
channel_incref(channel);
|
||||
CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
|
||||
success = true;
|
||||
} else {
|
||||
Stream *in = channel_instream(channel);
|
||||
success = wstream_write(in, buffer);
|
||||
}
|
||||
|
||||
|
||||
if (!success) {
|
||||
// If the write failed for any reason, close the channel
|
||||
char buf[256];
|
||||
@@ -613,14 +415,14 @@ static void internal_read_event(void **argv)
|
||||
Channel *channel = argv[0];
|
||||
WBuffer *buffer = argv[1];
|
||||
|
||||
msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
|
||||
memcpy(msgpack_unpacker_buffer(channel->unpacker),
|
||||
msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
|
||||
memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
|
||||
buffer->data, buffer->size);
|
||||
msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
|
||||
msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
|
||||
|
||||
parse_msgpack(channel);
|
||||
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
wstream_release_wbuffer(buffer);
|
||||
}
|
||||
|
||||
@@ -669,7 +471,8 @@ static void broadcast_event(const char *name, Array args)
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
|
||||
if (channel->is_rpc
|
||||
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
|
||||
kv_push(subscribed, channel);
|
||||
}
|
||||
});
|
||||
@@ -699,10 +502,11 @@ end:
|
||||
static void unsubscribe(Channel *channel, char *event)
|
||||
{
|
||||
char *event_string = pmap_get(cstr_t)(event_strings, event);
|
||||
pmap_del(cstr_t)(channel->subscribed_events, event_string);
|
||||
pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
|
||||
if (channel->is_rpc
|
||||
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
|
||||
return;
|
||||
}
|
||||
});
|
||||
@@ -712,98 +516,43 @@ static void unsubscribe(Channel *channel, char *event)
|
||||
xfree(event_string);
|
||||
}
|
||||
|
||||
/// Close the channel streams/process and free the channel resources.
|
||||
static void close_channel(Channel *channel)
|
||||
|
||||
/// Mark rpc state as closed, and release its reference to the channel.
|
||||
/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
|
||||
void rpc_close(Channel *channel)
|
||||
{
|
||||
if (channel->closed) {
|
||||
if (channel->rpc.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
channel->closed = true;
|
||||
channel->rpc.closed = true;
|
||||
channel_decref(channel);
|
||||
|
||||
switch (channel->type) {
|
||||
case kChannelTypeSocket:
|
||||
stream_close(&channel->data.stream, NULL, NULL);
|
||||
break;
|
||||
case kChannelTypeProc:
|
||||
// Only close the rpc channel part,
|
||||
// there could be an error message on the stderr stream
|
||||
process_close_in(channel->data.proc);
|
||||
process_close_out(channel->data.proc);
|
||||
break;
|
||||
case kChannelTypeStdio:
|
||||
stream_close(&channel->data.std.in, NULL, NULL);
|
||||
stream_close(&channel->data.std.out, NULL, NULL);
|
||||
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
|
||||
return;
|
||||
case kChannelTypeInternal:
|
||||
// nothing to free.
|
||||
break;
|
||||
if (channel->streamtype == kChannelStreamStdio) {
|
||||
multiqueue_put(main_loop.fast_events, exit_event, 0);
|
||||
}
|
||||
|
||||
decref(channel);
|
||||
}
|
||||
|
||||
static void exit_event(void **argv)
|
||||
{
|
||||
decref(argv[0]);
|
||||
|
||||
if (!exiting) {
|
||||
mch_exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_channel(Channel *channel)
|
||||
void rpc_free(Channel *channel)
|
||||
{
|
||||
remote_ui_disconnect(channel->id);
|
||||
pmap_del(uint64_t)(channels, channel->id);
|
||||
msgpack_unpacker_free(channel->unpacker);
|
||||
msgpack_unpacker_free(channel->rpc.unpacker);
|
||||
|
||||
// Unsubscribe from all events
|
||||
char *event_string;
|
||||
map_foreach_value(channel->subscribed_events, event_string, {
|
||||
map_foreach_value(channel->rpc.subscribed_events, event_string, {
|
||||
unsubscribe(channel, event_string);
|
||||
});
|
||||
|
||||
pmap_free(cstr_t)(channel->subscribed_events);
|
||||
kv_destroy(channel->call_stack);
|
||||
if (channel->type != kChannelTypeProc) {
|
||||
multiqueue_free(channel->events);
|
||||
}
|
||||
xfree(channel);
|
||||
}
|
||||
|
||||
static void close_cb(Stream *stream, void *data)
|
||||
{
|
||||
decref(data);
|
||||
}
|
||||
|
||||
/// @param source description of source function, rplugin name, TCP addr, etc
|
||||
static Channel *register_channel(ChannelType type, uint64_t id,
|
||||
MultiQueue *events, char *source)
|
||||
{
|
||||
// Jobs and channels share the same id namespace.
|
||||
assert(id == 0 || !pmap_get(uint64_t)(channels, id));
|
||||
Channel *rv = xmalloc(sizeof(Channel));
|
||||
rv->events = events ? events : multiqueue_new_child(main_loop.events);
|
||||
rv->type = type;
|
||||
rv->refcount = 1;
|
||||
rv->closed = false;
|
||||
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||
rv->id = id > 0 ? id : next_chan_id++;
|
||||
rv->subscribed_events = pmap_new(cstr_t)();
|
||||
rv->next_request_id = 1;
|
||||
kv_init(rv->call_stack);
|
||||
pmap_put(uint64_t)(channels, rv->id, rv);
|
||||
|
||||
ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
|
||||
(type == kChannelTypeProc ? "proc"
|
||||
: (type == kChannelTypeSocket ? "socket"
|
||||
: (type == kChannelTypeStdio ? "stdio"
|
||||
: (type == kChannelTypeInternal ? "internal" : "?")))),
|
||||
(source ? source : "?"));
|
||||
|
||||
return rv;
|
||||
pmap_free(cstr_t)(channel->rpc.subscribed_events);
|
||||
kv_destroy(channel->rpc.call_stack);
|
||||
}
|
||||
|
||||
static bool is_rpc_response(msgpack_object *obj)
|
||||
@@ -818,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj)
|
||||
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
uint64_t response_id = obj->via.array.ptr[1].via.u64;
|
||||
if (kv_size(channel->rpc.call_stack) == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Must be equal to the frame at the stack's bottom
|
||||
return kv_size(channel->call_stack) && response_id
|
||||
== kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id;
|
||||
ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
|
||||
return response_id == frame->request_id;
|
||||
}
|
||||
|
||||
static void complete_call(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
ChannelCallFrame *frame = kv_A(channel->call_stack,
|
||||
kv_size(channel->call_stack) - 1);
|
||||
ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
|
||||
frame->returned = true;
|
||||
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
|
||||
|
||||
@@ -840,15 +592,15 @@ static void complete_call(msgpack_object *obj, Channel *channel)
|
||||
static void call_set_error(Channel *channel, char *msg, int loglevel)
|
||||
{
|
||||
LOG(loglevel, "RPC: %s", msg);
|
||||
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
|
||||
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
|
||||
for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
|
||||
ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
|
||||
frame->returned = true;
|
||||
frame->errored = true;
|
||||
api_free_object(frame->result);
|
||||
frame->result = STRING_OBJ(cstr_to_string(msg));
|
||||
}
|
||||
|
||||
close_channel(channel);
|
||||
channel_close(channel->id, kChannelPartRpc, NULL);
|
||||
}
|
||||
|
||||
static WBuffer *serialize_request(uint64_t channel_id,
|
||||
@@ -890,18 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
|
||||
return rv;
|
||||
}
|
||||
|
||||
static void incref(Channel *channel)
|
||||
{
|
||||
channel->refcount++;
|
||||
}
|
||||
|
||||
static void decref(Channel *channel)
|
||||
{
|
||||
if (!(--channel->refcount)) {
|
||||
free_channel(channel);
|
||||
}
|
||||
}
|
||||
|
||||
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
|
||||
#define REQ "[request] "
|
||||
#define RES "[response] "
|
||||
|
@@ -8,6 +8,7 @@
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/event/process.h"
|
||||
#include "nvim/vim.h"
|
||||
#include "nvim/channel.h"
|
||||
|
||||
#define METHOD_MAXLEN 512
|
||||
|
||||
@@ -16,6 +17,7 @@
|
||||
/// of os_inchar(), so they are processed "just-in-time".
|
||||
MultiQueue *ch_before_blocking_events;
|
||||
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/channel.h.generated.h"
|
||||
#endif
|
||||
|
36
src/nvim/msgpack_rpc/channel_defs.h
Normal file
36
src/nvim/msgpack_rpc/channel_defs.h
Normal file
@@ -0,0 +1,36 @@
|
||||
#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
|
||||
#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <uv.h>
|
||||
#include <msgpack.h>
|
||||
|
||||
#include "nvim/api/private/defs.h"
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/event/process.h"
|
||||
#include "nvim/vim.h"
|
||||
|
||||
typedef struct Channel Channel;
|
||||
|
||||
typedef struct {
|
||||
uint64_t request_id;
|
||||
bool returned, errored;
|
||||
Object result;
|
||||
} ChannelCallFrame;
|
||||
|
||||
typedef struct {
|
||||
Channel *channel;
|
||||
MsgpackRpcRequestHandler handler;
|
||||
Array args;
|
||||
uint64_t request_id;
|
||||
} RequestEvent;
|
||||
|
||||
typedef struct {
|
||||
PMap(cstr_t) *subscribed_events;
|
||||
bool closed;
|
||||
msgpack_unpacker *unpacker;
|
||||
uint64_t next_request_id;
|
||||
kvec_t(ChannelCallFrame *) call_stack;
|
||||
} RpcState;
|
||||
|
||||
#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
|
@@ -115,6 +115,7 @@ static int p_bomb;
|
||||
static char_u *p_bh;
|
||||
static char_u *p_bt;
|
||||
static int p_bl;
|
||||
static long p_channel;
|
||||
static int p_ci;
|
||||
static int p_cin;
|
||||
static char_u *p_cink;
|
||||
@@ -4193,6 +4194,9 @@ static char *set_num_option(int opt_idx, char_u *varp, long value,
|
||||
curbuf->b_p_imsearch = B_IMODE_NONE;
|
||||
}
|
||||
p_imsearch = curbuf->b_p_imsearch;
|
||||
} else if (pp == &p_channel || pp == &curbuf->b_p_channel) {
|
||||
errmsg = e_invarg;
|
||||
*pp = old_value;
|
||||
}
|
||||
/* if 'titlelen' has changed, redraw the title */
|
||||
else if (pp == &p_titlelen) {
|
||||
@@ -5472,6 +5476,7 @@ static char_u *get_varp(vimoption_T *p)
|
||||
case PV_BH: return (char_u *)&(curbuf->b_p_bh);
|
||||
case PV_BT: return (char_u *)&(curbuf->b_p_bt);
|
||||
case PV_BL: return (char_u *)&(curbuf->b_p_bl);
|
||||
case PV_CHANNEL:return (char_u *)&(curbuf->b_p_channel);
|
||||
case PV_CI: return (char_u *)&(curbuf->b_p_ci);
|
||||
case PV_CIN: return (char_u *)&(curbuf->b_p_cin);
|
||||
case PV_CINK: return (char_u *)&(curbuf->b_p_cink);
|
||||
@@ -5773,6 +5778,7 @@ void buf_copy_options(buf_T *buf, int flags)
|
||||
buf->b_p_nf = vim_strsave(p_nf);
|
||||
buf->b_p_mps = vim_strsave(p_mps);
|
||||
buf->b_p_si = p_si;
|
||||
buf->b_p_channel = 0;
|
||||
buf->b_p_ci = p_ci;
|
||||
buf->b_p_cin = p_cin;
|
||||
buf->b_p_cink = vim_strsave(p_cink);
|
||||
|
@@ -695,6 +695,7 @@ enum {
|
||||
, BV_BIN
|
||||
, BV_BL
|
||||
, BV_BOMB
|
||||
, BV_CHANNEL
|
||||
, BV_CI
|
||||
, BV_CIN
|
||||
, BV_CINK
|
||||
|
@@ -294,6 +294,14 @@ return {
|
||||
varname='p_cedit',
|
||||
defaults={if_true={vi="", vim=macros('CTRL_F_STR')}}
|
||||
},
|
||||
{
|
||||
full_name='channel',
|
||||
type='number', scope={'buffer'},
|
||||
no_mkrc=true,
|
||||
nodefault=true,
|
||||
varname='p_channel',
|
||||
defaults={if_true={vi=0}}
|
||||
},
|
||||
{
|
||||
full_name='charconvert', abbreviation='ccv',
|
||||
type='string', scope={'global'},
|
||||
|
@@ -37,7 +37,7 @@ typedef enum {
|
||||
static Stream read_stream = {.closed = true};
|
||||
static RBuffer *input_buffer = NULL;
|
||||
static bool input_eof = false;
|
||||
static int global_fd = 0;
|
||||
static int global_fd = -1;
|
||||
static int events_enabled = 0;
|
||||
static bool blocking = false;
|
||||
|
||||
|
@@ -36,23 +36,36 @@
|
||||
# include "os/pty_process_unix.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// termios saved at startup (for TUI) or initialized by pty_process_spawn().
|
||||
static struct termios termios_default;
|
||||
|
||||
/// Saves the termios properties associated with `tty_fd`.
|
||||
///
|
||||
/// @param tty_fd TTY file descriptor, or -1 if not in a terminal.
|
||||
void pty_process_save_termios(int tty_fd)
|
||||
{
|
||||
if (tty_fd == -1 || tcgetattr(tty_fd, &termios_default) != 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/// @returns zero on success, or negative error code
|
||||
int pty_process_spawn(PtyProcess *ptyproc)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
static struct termios termios;
|
||||
if (!termios.c_cflag) {
|
||||
init_termios(&termios);
|
||||
if (!termios_default.c_cflag) {
|
||||
// TODO(jkeyes): We could pass NULL to forkpty() instead ...
|
||||
init_termios(&termios_default);
|
||||
}
|
||||
|
||||
int status = 0; // zero or negative error code (libuv convention)
|
||||
Process *proc = (Process *)ptyproc;
|
||||
assert(!proc->err);
|
||||
assert(proc->err.closed);
|
||||
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
|
||||
ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 };
|
||||
uv_disable_stdio_inheritance();
|
||||
int master;
|
||||
int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
|
||||
int pid = forkpty(&master, NULL, &termios_default, &ptyproc->winsize);
|
||||
|
||||
if (pid < 0) {
|
||||
status = -errno;
|
||||
@@ -83,12 +96,12 @@ int pty_process_spawn(PtyProcess *ptyproc)
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (proc->in
|
||||
&& (status = set_duplicating_descriptor(master, &proc->in->uv.pipe))) {
|
||||
if (!proc->in.closed
|
||||
&& (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) {
|
||||
goto error;
|
||||
}
|
||||
if (proc->out
|
||||
&& (status = set_duplicating_descriptor(master, &proc->out->uv.pipe))) {
|
||||
if (!proc->out.closed
|
||||
&& (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
|
@@ -44,7 +44,7 @@ int pty_process_spawn(PtyProcess *ptyproc)
|
||||
wchar_t *cwd = NULL;
|
||||
const char *emsg = NULL;
|
||||
|
||||
assert(!proc->err);
|
||||
assert(proc->err.closed);
|
||||
|
||||
cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err);
|
||||
if (cfg == NULL) {
|
||||
@@ -71,20 +71,20 @@ int pty_process_spawn(PtyProcess *ptyproc)
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (proc->in != NULL) {
|
||||
if (!proc->in.closed) {
|
||||
in_req = xmalloc(sizeof(uv_connect_t));
|
||||
uv_pipe_connect(
|
||||
in_req,
|
||||
&proc->in->uv.pipe,
|
||||
&proc->in.uv.pipe,
|
||||
in_name,
|
||||
pty_process_connect_cb);
|
||||
}
|
||||
|
||||
if (proc->out != NULL) {
|
||||
if (!proc->out.closed) {
|
||||
out_req = xmalloc(sizeof(uv_connect_t));
|
||||
uv_pipe_connect(
|
||||
out_req,
|
||||
&proc->out->uv.pipe,
|
||||
&proc->out.uv.pipe,
|
||||
out_name,
|
||||
pty_process_connect_cb);
|
||||
}
|
||||
@@ -228,7 +228,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer)
|
||||
PtyProcess *ptyproc = wait_eof_timer->data;
|
||||
Process *proc = (Process *)ptyproc;
|
||||
|
||||
if (!proc->out || !uv_is_readable(proc->out->uvstream)) {
|
||||
if (proc->out.closed || !uv_is_readable(proc->out.uvstream)) {
|
||||
uv_timer_stop(&ptyproc->wait_eof_timer);
|
||||
pty_process_finish2(ptyproc);
|
||||
}
|
||||
|
@@ -207,16 +207,12 @@ static int do_os_system(char **argv,
|
||||
char prog[MAXPATHL];
|
||||
xstrlcpy(prog, argv[0], MAXPATHL);
|
||||
|
||||
Stream in, out, err;
|
||||
LibuvProcess uvproc = libuv_process_init(&main_loop, &buf);
|
||||
Process *proc = &uvproc.process;
|
||||
MultiQueue *events = multiqueue_new_child(main_loop.events);
|
||||
proc->events = events;
|
||||
proc->argv = argv;
|
||||
proc->in = input != NULL ? &in : NULL;
|
||||
proc->out = &out;
|
||||
proc->err = &err;
|
||||
int status = process_spawn(proc);
|
||||
int status = process_spawn(proc, input != NULL, true, true);
|
||||
if (status) {
|
||||
loop_poll_events(&main_loop, 0);
|
||||
// Failed, probably 'shell' is not executable.
|
||||
@@ -231,32 +227,29 @@ static int do_os_system(char **argv,
|
||||
return -1;
|
||||
}
|
||||
|
||||
// We want to deal with stream events as fast a possible while queueing
|
||||
// process events, so reset everything to NULL. It prevents closing the
|
||||
// Note: unlike process events, stream events are not queued, as we want to
|
||||
// deal with stream events as fast a possible. It prevents closing the
|
||||
// streams while there's still data in the OS buffer (due to the process
|
||||
// exiting before all data is read).
|
||||
if (input != NULL) {
|
||||
proc->in->events = NULL;
|
||||
wstream_init(proc->in, 0);
|
||||
wstream_init(&proc->in, 0);
|
||||
}
|
||||
proc->out->events = NULL;
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, data_cb, &buf);
|
||||
proc->err->events = NULL;
|
||||
rstream_init(proc->err, 0);
|
||||
rstream_start(proc->err, data_cb, &buf);
|
||||
rstream_init(&proc->out, 0);
|
||||
rstream_start(&proc->out, data_cb, &buf);
|
||||
rstream_init(&proc->err, 0);
|
||||
rstream_start(&proc->err, data_cb, &buf);
|
||||
|
||||
// write the input, if any
|
||||
if (input) {
|
||||
WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL);
|
||||
|
||||
if (!wstream_write(&in, input_buffer)) {
|
||||
if (!wstream_write(&proc->in, input_buffer)) {
|
||||
// couldn't write, stop the process and tell the user about it
|
||||
process_stop(proc);
|
||||
return -1;
|
||||
}
|
||||
// close the input stream after everything is written
|
||||
wstream_set_write_cb(&in, shell_write_cb, NULL);
|
||||
wstream_set_write_cb(&proc->in, shell_write_cb, NULL);
|
||||
}
|
||||
|
||||
// Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the
|
||||
@@ -684,10 +677,6 @@ static void shell_write_cb(Stream *stream, void *data, int status)
|
||||
msg_schedule_emsgf(_("E5677: Error writing input to shell-command: %s"),
|
||||
uv_err_name(status));
|
||||
}
|
||||
if (stream->closed) { // Process may have exited before this write.
|
||||
WLOG("stream was already closed");
|
||||
return;
|
||||
}
|
||||
stream_close(stream, NULL, NULL);
|
||||
}
|
||||
|
||||
|
@@ -144,7 +144,9 @@ void mch_exit(int r) FUNC_ATTR_NORETURN
|
||||
if (!event_teardown() && r == 0) {
|
||||
r = 1; // Exit with error if main_loop did not teardown gracefully.
|
||||
}
|
||||
stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
|
||||
if (input_global_fd() >= 0) {
|
||||
stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
|
||||
}
|
||||
|
||||
#ifdef EXITFREE
|
||||
free_all_mem();
|
||||
|
@@ -121,7 +121,7 @@ char *rbuffer_read_ptr(RBuffer *buf, size_t *read_count) FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (!buf->size) {
|
||||
*read_count = 0;
|
||||
return NULL;
|
||||
return buf->read_ptr;
|
||||
}
|
||||
|
||||
if (buf->read_ptr < buf->write_ptr) {
|
||||
|
@@ -1094,11 +1094,12 @@ static void refresh_terminal(Terminal *term)
|
||||
// Calls refresh_terminal() on all invalidated_terminals.
|
||||
static void refresh_timer_cb(TimeWatcher *watcher, void *data)
|
||||
{
|
||||
refresh_pending = false;
|
||||
if (exiting // Cannot redraw (requires event loop) during teardown/exit.
|
||||
// WM_LIST (^D) is not redrawn, unlike the normal wildmenu. So we must
|
||||
// skip redraws to keep it visible.
|
||||
|| wild_menu_showing == WM_LIST) {
|
||||
goto end;
|
||||
return;
|
||||
}
|
||||
Terminal *term;
|
||||
void *stub; (void)(stub);
|
||||
@@ -1113,8 +1114,6 @@ static void refresh_timer_cb(TimeWatcher *watcher, void *data)
|
||||
if (any_visible) {
|
||||
redraw(true);
|
||||
}
|
||||
end:
|
||||
refresh_pending = false;
|
||||
}
|
||||
|
||||
static void refresh_size(Terminal *term, buf_T *buf)
|
||||
|
Reference in New Issue
Block a user