mirror of
https://github.com/neovim/neovim.git
synced 2025-10-05 17:36:29 +00:00
channels: allow bytes sockets and stdio, and buffered bytes output
This commit is contained in:
@@ -3,10 +3,25 @@
|
||||
|
||||
#include "nvim/api/ui.h"
|
||||
#include "nvim/channel.h"
|
||||
#include "nvim/eval.h"
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/msgpack_rpc/channel.h"
|
||||
#include "nvim/msgpack_rpc/server.h"
|
||||
#include "nvim/os/shell.h"
|
||||
#include "nvim/path.h"
|
||||
#include "nvim/ascii.h"
|
||||
|
||||
static bool did_stdio = false;
|
||||
PMap(uint64_t) *channels = NULL;
|
||||
|
||||
typedef struct {
|
||||
Channel *data;
|
||||
Callback *callback;
|
||||
const char *type;
|
||||
list_T *received;
|
||||
int status;
|
||||
} ChannelEvent;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "channel.c.generated.h"
|
||||
#endif
|
||||
@@ -32,6 +47,21 @@ void channel_init(void)
|
||||
remote_ui_init();
|
||||
}
|
||||
|
||||
/// Allocates a channel.
|
||||
///
|
||||
/// Channel is allocated with refcount 1, which should be decreased
|
||||
/// when the underlying stream closes.
|
||||
static Channel *channel_alloc(ChannelStreamType type)
|
||||
{
|
||||
Channel *chan = xcalloc(1, sizeof(*chan));
|
||||
chan->id = type == kChannelStreamStdio ? 1 : next_chan_id++;
|
||||
chan->events = multiqueue_new_child(main_loop.events);
|
||||
chan->refcount = 1;
|
||||
chan->streamtype = type;
|
||||
pmap_put(uint64_t)(channels, chan->id, chan);
|
||||
return chan;
|
||||
}
|
||||
|
||||
void channel_incref(Channel *channel)
|
||||
{
|
||||
channel->refcount++;
|
||||
@@ -44,6 +74,21 @@ void channel_decref(Channel *channel)
|
||||
}
|
||||
}
|
||||
|
||||
void callback_reader_free(CallbackReader *reader)
|
||||
{
|
||||
callback_free(&reader->cb);
|
||||
if (reader->buffered) {
|
||||
ga_clear(&reader->buffer);
|
||||
}
|
||||
}
|
||||
|
||||
void callback_reader_start(CallbackReader *reader)
|
||||
{
|
||||
if (reader->buffered) {
|
||||
ga_init(&reader->buffer, sizeof(char *), 1);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_channel_event(void **argv)
|
||||
{
|
||||
Channel *channel = argv[0];
|
||||
@@ -51,11 +96,379 @@ static void free_channel_event(void **argv)
|
||||
rpc_free(channel);
|
||||
}
|
||||
|
||||
callback_free(&channel->on_stdout);
|
||||
callback_free(&channel->on_stderr);
|
||||
callback_reader_free(&channel->on_stdout);
|
||||
callback_reader_free(&channel->on_stderr);
|
||||
callback_free(&channel->on_exit);
|
||||
|
||||
pmap_del(uint64_t)(channels, channel->id);
|
||||
multiqueue_free(channel->events);
|
||||
xfree(channel);
|
||||
}
|
||||
|
||||
static void channel_destroy_early(Channel *chan)
|
||||
{
|
||||
if ((chan->id != --next_chan_id)) {
|
||||
abort();
|
||||
}
|
||||
|
||||
if ((--chan->refcount != 0)) {
|
||||
abort();
|
||||
}
|
||||
|
||||
free_channel_event((void **)&chan);
|
||||
}
|
||||
|
||||
|
||||
static void close_cb(Stream *stream, void *data)
|
||||
{
|
||||
channel_decref(data);
|
||||
}
|
||||
|
||||
Channel *channel_job_start(char **argv, CallbackReader on_stdout,
|
||||
CallbackReader on_stderr, Callback on_exit,
|
||||
bool pty, bool rpc, bool detach, const char *cwd,
|
||||
uint16_t pty_width, uint16_t pty_height,
|
||||
char *term_name, varnumber_T *status_out)
|
||||
{
|
||||
Channel *chan = channel_alloc(kChannelStreamProc);
|
||||
chan->on_stdout = on_stdout;
|
||||
chan->on_stderr = on_stderr;
|
||||
chan->on_exit = on_exit;
|
||||
chan->is_rpc = rpc;
|
||||
|
||||
if (pty) {
|
||||
if (detach) {
|
||||
EMSG2(_(e_invarg2), "terminal/pty job cannot be detached");
|
||||
shell_free_argv(argv);
|
||||
xfree(term_name);
|
||||
channel_destroy_early(chan);
|
||||
*status_out = 0;
|
||||
return NULL;
|
||||
}
|
||||
chan->stream.pty = pty_process_init(&main_loop, chan);
|
||||
if (pty_width > 0) {
|
||||
chan->stream.pty.width = pty_width;
|
||||
}
|
||||
if (pty_height > 0) {
|
||||
chan->stream.pty.height = pty_height;
|
||||
}
|
||||
if (term_name) {
|
||||
chan->stream.pty.term_name = term_name;
|
||||
}
|
||||
} else {
|
||||
chan->stream.uv = libuv_process_init(&main_loop, chan);
|
||||
}
|
||||
|
||||
Process *proc = (Process *)&chan->stream.proc;
|
||||
proc->argv = argv;
|
||||
proc->cb = channel_process_exit_cb;
|
||||
proc->events = chan->events;
|
||||
proc->detach = detach;
|
||||
proc->cwd = cwd;
|
||||
|
||||
char *cmd = xstrdup(proc->argv[0]);
|
||||
bool has_out, has_err;
|
||||
if (proc->type == kProcessTypePty) {
|
||||
has_out = true;
|
||||
has_err = false;
|
||||
} else {
|
||||
has_out = chan->is_rpc || callback_reader_set(chan->on_stdout);
|
||||
has_err = callback_reader_set(chan->on_stderr);
|
||||
}
|
||||
int status = process_spawn(proc, true, has_out, has_err);
|
||||
if (has_err) {
|
||||
proc->err.events = chan->events;
|
||||
}
|
||||
if (status) {
|
||||
EMSG3(_(e_jobspawn), os_strerror(status), cmd);
|
||||
xfree(cmd);
|
||||
if (proc->type == kProcessTypePty) {
|
||||
xfree(chan->stream.pty.term_name);
|
||||
}
|
||||
channel_destroy_early(chan);
|
||||
*status_out = proc->status;
|
||||
return NULL;
|
||||
}
|
||||
xfree(cmd);
|
||||
|
||||
wstream_init(&proc->in, 0);
|
||||
if (has_out) {
|
||||
rstream_init(&proc->out, 0);
|
||||
}
|
||||
|
||||
if (chan->is_rpc) {
|
||||
// the rpc takes over the in and out streams
|
||||
rpc_start(chan);
|
||||
} else {
|
||||
proc->in.events = chan->events;
|
||||
if (has_out) {
|
||||
callback_reader_start(&chan->on_stdout);
|
||||
proc->out.events = chan->events;
|
||||
rstream_start(&proc->out, on_job_stdout, chan);
|
||||
}
|
||||
}
|
||||
|
||||
if (has_err) {
|
||||
callback_reader_start(&chan->on_stderr);
|
||||
rstream_init(&proc->err, 0);
|
||||
rstream_start(&proc->err, on_job_stderr, chan);
|
||||
}
|
||||
*status_out = (varnumber_T)chan->id;
|
||||
return chan;
|
||||
}
|
||||
|
||||
|
||||
uint64_t channel_connect(bool tcp, const char *address,
|
||||
bool rpc, CallbackReader on_output,
|
||||
int timeout, const char **error)
|
||||
{
|
||||
if (!tcp && rpc) {
|
||||
char *path = fix_fname(address);
|
||||
if (server_owns_pipe_address(path)) {
|
||||
// avoid deadlock
|
||||
xfree(path);
|
||||
return channel_create_internal_rpc();
|
||||
}
|
||||
xfree(path);
|
||||
}
|
||||
|
||||
Channel *channel = channel_alloc(kChannelStreamSocket);
|
||||
if (!socket_connect(&main_loop, &channel->stream.socket,
|
||||
tcp, address, timeout, error)) {
|
||||
channel_destroy_early(channel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
channel_incref(channel); // close channel only after the stream is closed
|
||||
channel->stream.socket.internal_close_cb = close_cb;
|
||||
channel->stream.socket.internal_data = channel;
|
||||
wstream_init(&channel->stream.socket, 0);
|
||||
rstream_init(&channel->stream.socket, 0);
|
||||
|
||||
if (rpc) {
|
||||
rpc_start(channel);
|
||||
} else {
|
||||
channel->on_stdout = on_output;
|
||||
callback_reader_start(&channel->on_stdout);
|
||||
channel->stream.socket.events = channel->events;
|
||||
rstream_start(&channel->stream.socket, on_socket_output, channel);
|
||||
}
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// Creates an RPC channel from a tcp/pipe socket connection
|
||||
///
|
||||
/// @param watcher The SocketWatcher ready to accept the connection
|
||||
void channel_from_connection(SocketWatcher *watcher)
|
||||
{
|
||||
Channel *channel = channel_alloc(kChannelStreamSocket);
|
||||
socket_watcher_accept(watcher, &channel->stream.socket);
|
||||
channel_incref(channel); // close channel only after the stream is closed
|
||||
channel->stream.socket.internal_close_cb = close_cb;
|
||||
channel->stream.socket.internal_data = channel;
|
||||
wstream_init(&channel->stream.socket, 0);
|
||||
rstream_init(&channel->stream.socket, 0);
|
||||
rpc_start(channel);
|
||||
}
|
||||
|
||||
/// Creates a loopback channel. This is used to avoid deadlock
|
||||
/// when an instance connects to its own named pipe.
|
||||
static uint64_t channel_create_internal_rpc(void)
|
||||
{
|
||||
Channel *channel = channel_alloc(kChannelStreamInternal);
|
||||
channel_incref(channel); // internal channel lives until process exit
|
||||
rpc_start(channel);
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// Creates an API channel from stdin/stdout. This is used when embedding
|
||||
/// Neovim
|
||||
uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
|
||||
const char **error)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (!headless_mode) {
|
||||
*error = _("can only be opened in headless mode");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (did_stdio) {
|
||||
*error = _("channel was already open");
|
||||
return 0;
|
||||
}
|
||||
did_stdio = true;
|
||||
|
||||
Channel *channel = channel_alloc(kChannelStreamStdio);
|
||||
|
||||
rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0);
|
||||
wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0);
|
||||
|
||||
if (rpc) {
|
||||
rpc_start(channel);
|
||||
} else {
|
||||
channel->on_stdout = on_output;
|
||||
callback_reader_start(&channel->on_stdout);
|
||||
channel->stream.stdio.in.events = channel->events;
|
||||
channel->stream.stdio.out.events = channel->events;
|
||||
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
|
||||
}
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
// vimscript job callbacks must be executed on Nvim main loop
|
||||
static inline void process_channel_event(Channel *chan, Callback *callback,
|
||||
const char *type, char *buf,
|
||||
size_t count, int status)
|
||||
{
|
||||
ChannelEvent event_data;
|
||||
event_data.received = NULL;
|
||||
if (buf) {
|
||||
event_data.received = tv_list_alloc();
|
||||
char *ptr = buf;
|
||||
size_t remaining = count;
|
||||
size_t off = 0;
|
||||
|
||||
while (off < remaining) {
|
||||
// append the line
|
||||
if (ptr[off] == NL) {
|
||||
tv_list_append_string(event_data.received, ptr, (ssize_t)off);
|
||||
size_t skip = off + 1;
|
||||
ptr += skip;
|
||||
remaining -= skip;
|
||||
off = 0;
|
||||
continue;
|
||||
}
|
||||
if (ptr[off] == NUL) {
|
||||
// Translate NUL to NL
|
||||
ptr[off] = NL;
|
||||
}
|
||||
off++;
|
||||
}
|
||||
tv_list_append_string(event_data.received, ptr, (ssize_t)off);
|
||||
} else {
|
||||
event_data.status = status;
|
||||
}
|
||||
event_data.data = chan;
|
||||
event_data.callback = callback;
|
||||
event_data.type = type;
|
||||
on_channel_event(&event_data);
|
||||
}
|
||||
|
||||
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
|
||||
}
|
||||
|
||||
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
|
||||
}
|
||||
|
||||
static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
|
||||
}
|
||||
|
||||
static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *chan = data;
|
||||
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
|
||||
}
|
||||
|
||||
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
|
||||
size_t count, bool eof, CallbackReader *reader,
|
||||
const char *type)
|
||||
{
|
||||
// stub variable, to keep reading consistent with the order of events, only
|
||||
// consider the count parameter.
|
||||
size_t r;
|
||||
char *ptr = rbuffer_read_ptr(buf, &r);
|
||||
|
||||
if (eof) {
|
||||
if (reader->buffered) {
|
||||
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
|
||||
(size_t)reader->buffer.ga_len, 0);
|
||||
ga_clear(&reader->buffer);
|
||||
} else if (callback_reader_set(*reader)) {
|
||||
process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// The order here matters, the terminal must receive the data first because
|
||||
// process_channel_event will modify the read buffer(convert NULs into NLs)
|
||||
if (chan->term) {
|
||||
terminal_receive(chan->term, ptr, count);
|
||||
}
|
||||
|
||||
rbuffer_consumed(buf, count);
|
||||
if (reader->buffered) {
|
||||
ga_concat_len(&reader->buffer, ptr, count);
|
||||
} else if (callback_reader_set(*reader)) {
|
||||
process_channel_event(chan, &reader->cb, type, ptr, count, 0);
|
||||
}
|
||||
}
|
||||
|
||||
static void channel_process_exit_cb(Process *proc, int status, void *data)
|
||||
{
|
||||
Channel *chan = data;
|
||||
if (chan->term && !chan->stream.proc.exited) {
|
||||
chan->stream.proc.exited = true;
|
||||
char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN];
|
||||
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
|
||||
terminal_close(chan->term, msg);
|
||||
}
|
||||
if (chan->is_rpc) {
|
||||
channel_process_exit(chan->id, status);
|
||||
}
|
||||
|
||||
if (chan->status_ptr) {
|
||||
*chan->status_ptr = status;
|
||||
}
|
||||
|
||||
process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
|
||||
|
||||
channel_decref(chan);
|
||||
}
|
||||
|
||||
static void on_channel_event(ChannelEvent *ev)
|
||||
{
|
||||
if (!ev->callback) {
|
||||
return;
|
||||
}
|
||||
|
||||
typval_T argv[4];
|
||||
|
||||
argv[0].v_type = VAR_NUMBER;
|
||||
argv[0].v_lock = VAR_UNLOCKED;
|
||||
argv[0].vval.v_number = (varnumber_T)ev->data->id;
|
||||
|
||||
if (ev->received) {
|
||||
argv[1].v_type = VAR_LIST;
|
||||
argv[1].v_lock = VAR_UNLOCKED;
|
||||
argv[1].vval.v_list = ev->received;
|
||||
argv[1].vval.v_list->lv_refcount++;
|
||||
} else {
|
||||
argv[1].v_type = VAR_NUMBER;
|
||||
argv[1].v_lock = VAR_UNLOCKED;
|
||||
argv[1].vval.v_number = ev->status;
|
||||
}
|
||||
|
||||
argv[2].v_type = VAR_STRING;
|
||||
argv[2].v_lock = VAR_UNLOCKED;
|
||||
argv[2].vval.v_string = (uint8_t *)ev->type;
|
||||
|
||||
typval_T rettv = TV_INITIAL_VALUE;
|
||||
callback_call(ev->callback, 3, argv, &rettv);
|
||||
tv_clear(&rettv);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user