mirror of
https://github.com/neovim/neovim.git
synced 2025-09-30 23:18:33 +00:00
channels: refactor
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
#include "nvim/api/private/helpers.h"
|
||||
#include "nvim/api/vim.h"
|
||||
#include "nvim/api/ui.h"
|
||||
#include "nvim/channel.h"
|
||||
#include "nvim/msgpack_rpc/channel.h"
|
||||
#include "nvim/msgpack_rpc/server.h"
|
||||
#include "nvim/event/loop.h"
|
||||
@@ -40,47 +41,6 @@
|
||||
#define log_server_msg(...)
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
kChannelTypeSocket,
|
||||
kChannelTypeProc,
|
||||
kChannelTypeStdio,
|
||||
kChannelTypeInternal
|
||||
} ChannelType;
|
||||
|
||||
typedef struct {
|
||||
uint64_t request_id;
|
||||
bool returned, errored;
|
||||
Object result;
|
||||
} ChannelCallFrame;
|
||||
|
||||
typedef struct {
|
||||
uint64_t id;
|
||||
size_t refcount;
|
||||
PMap(cstr_t) *subscribed_events;
|
||||
bool closed;
|
||||
ChannelType type;
|
||||
msgpack_unpacker *unpacker;
|
||||
union {
|
||||
Stream stream; // bidirectional (socket)
|
||||
Process *proc;
|
||||
struct {
|
||||
Stream in;
|
||||
Stream out;
|
||||
} std;
|
||||
} data;
|
||||
uint64_t next_request_id;
|
||||
kvec_t(ChannelCallFrame *) call_stack;
|
||||
MultiQueue *events;
|
||||
} Channel;
|
||||
|
||||
typedef struct {
|
||||
Channel *channel;
|
||||
MsgpackRpcRequestHandler handler;
|
||||
Array args;
|
||||
uint64_t request_id;
|
||||
} RequestEvent;
|
||||
|
||||
static PMap(uint64_t) *channels = NULL;
|
||||
static PMap(cstr_t) *event_strings = NULL;
|
||||
static msgpack_sbuffer out_buffer;
|
||||
|
||||
@@ -88,50 +48,32 @@ static msgpack_sbuffer out_buffer;
|
||||
# include "msgpack_rpc/channel.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// Initializes the module
|
||||
void channel_init(void)
|
||||
void rpc_init(void)
|
||||
{
|
||||
ch_before_blocking_events = multiqueue_new_child(main_loop.events);
|
||||
channels = pmap_new(uint64_t)();
|
||||
event_strings = pmap_new(cstr_t)();
|
||||
msgpack_sbuffer_init(&out_buffer);
|
||||
remote_ui_init();
|
||||
}
|
||||
|
||||
/// Teardown the module
|
||||
void channel_teardown(void)
|
||||
|
||||
void rpc_start(Channel *channel)
|
||||
{
|
||||
if (!channels) {
|
||||
return;
|
||||
}
|
||||
channel->is_rpc = true;
|
||||
RpcState *rpc = &channel->rpc;
|
||||
rpc->closed = false;
|
||||
rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||
rpc->subscribed_events = pmap_new(cstr_t)();
|
||||
rpc->next_request_id = 1;
|
||||
kv_init(rpc->call_stack);
|
||||
|
||||
Channel *channel;
|
||||
Stream *in = channel_instream(channel);
|
||||
Stream *out = channel_outstream(channel);
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
close_channel(channel);
|
||||
});
|
||||
}
|
||||
DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out);
|
||||
|
||||
/// Creates an API channel by starting a process and connecting to its
|
||||
/// stdin/stdout. stderr is handled by the job infrastructure.
|
||||
///
|
||||
/// @param argv The argument vector for the process. [consumed]
|
||||
/// @return The channel id (> 0), on success.
|
||||
/// 0, on error.
|
||||
uint64_t channel_from_process(Process *proc, uint64_t id)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
|
||||
incref(channel); // process channels are only closed by the exit_cb
|
||||
channel->data.proc = proc;
|
||||
|
||||
wstream_init(proc->in, 0);
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, receive_msgpack, channel);
|
||||
|
||||
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
|
||||
proc->out);
|
||||
|
||||
return channel->id;
|
||||
wstream_init(in, 0);
|
||||
rstream_init(out, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(out, receive_msgpack, channel);
|
||||
}
|
||||
|
||||
/// Creates an API channel from a tcp/pipe socket connection
|
||||
@@ -139,19 +81,15 @@ uint64_t channel_from_process(Process *proc, uint64_t id)
|
||||
/// @param watcher The SocketWatcher ready to accept the connection
|
||||
void channel_from_connection(SocketWatcher *watcher)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
|
||||
socket_watcher_accept(watcher, &channel->data.stream);
|
||||
incref(channel); // close channel only after the stream is closed
|
||||
channel->data.stream.internal_close_cb = close_cb;
|
||||
channel->data.stream.internal_data = channel;
|
||||
wstream_init(&channel->data.stream, 0);
|
||||
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.stream, receive_msgpack, channel);
|
||||
|
||||
DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
|
||||
&channel->data.stream);
|
||||
Channel *channel = channel_alloc(kChannelStreamSocket);
|
||||
socket_watcher_accept(watcher, &channel->stream.socket);
|
||||
channel_incref(channel); // close channel only after the stream is closed
|
||||
channel->stream.socket.internal_close_cb = close_cb;
|
||||
channel->stream.socket.internal_data = channel;
|
||||
rpc_start(channel);
|
||||
}
|
||||
|
||||
/// TODO: move to eval.c, also support bytes
|
||||
uint64_t channel_connect(bool tcp, const char *address,
|
||||
int timeout, const char **error)
|
||||
{
|
||||
@@ -165,34 +103,40 @@ uint64_t channel_connect(bool tcp, const char *address,
|
||||
xfree(path);
|
||||
}
|
||||
|
||||
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
|
||||
if (!socket_connect(&main_loop, &channel->data.stream,
|
||||
Channel *channel = channel_alloc(kChannelStreamSocket);
|
||||
if (!socket_connect(&main_loop, &channel->stream.socket,
|
||||
tcp, address, timeout, error)) {
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
incref(channel); // close channel only after the stream is closed
|
||||
channel->data.stream.internal_close_cb = close_cb;
|
||||
channel->data.stream.internal_data = channel;
|
||||
wstream_init(&channel->data.stream, 0);
|
||||
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.stream, receive_msgpack, channel);
|
||||
channel_incref(channel); // close channel only after the stream is closed
|
||||
channel->stream.socket.internal_close_cb = close_cb;
|
||||
channel->stream.socket.internal_data = channel;
|
||||
rpc_start(channel);
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
static Channel *find_rpc_channel(uint64_t id)
|
||||
{
|
||||
Channel *chan = find_channel(id);
|
||||
if (!chan || !chan->is_rpc || chan->rpc.closed) {
|
||||
return NULL;
|
||||
}
|
||||
return chan;
|
||||
}
|
||||
|
||||
/// Publishes an event to a channel.
|
||||
///
|
||||
/// @param id Channel id. 0 means "broadcast to all subscribed channels"
|
||||
/// @param name Event name (application-defined)
|
||||
/// @param args Array of event arguments
|
||||
/// @return True if the event was sent successfully, false otherwise.
|
||||
bool channel_send_event(uint64_t id, const char *name, Array args)
|
||||
bool rpc_send_event(uint64_t id, const char *name, Array args)
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (id && (!(channel = pmap_get(uint64_t)(channels, id))
|
||||
|| channel->closed)) {
|
||||
if (id && (!(channel = find_rpc_channel(id)))) {
|
||||
api_free_array(args);
|
||||
return false;
|
||||
}
|
||||
@@ -213,29 +157,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
|
||||
/// @param args Array with method arguments
|
||||
/// @param[out] error True if the return value is an error
|
||||
/// @return Whatever the remote method returned
|
||||
Object channel_send_call(uint64_t id,
|
||||
const char *method_name,
|
||||
Array args,
|
||||
Error *err)
|
||||
Object rpc_send_call(uint64_t id,
|
||||
const char *method_name,
|
||||
Array args,
|
||||
Error *err)
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
|
||||
api_free_array(args);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
incref(channel);
|
||||
uint64_t request_id = channel->next_request_id++;
|
||||
channel_incref(channel);
|
||||
RpcState *rpc = &channel->rpc;
|
||||
uint64_t request_id = rpc->next_request_id++;
|
||||
// Send the msgpack-rpc request
|
||||
send_request(channel, request_id, method_name, args);
|
||||
|
||||
// Push the frame
|
||||
ChannelCallFrame frame = { request_id, false, false, NIL };
|
||||
kv_push(channel->call_stack, &frame);
|
||||
kv_push(rpc->call_stack, &frame);
|
||||
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
|
||||
(void)kv_pop(channel->call_stack);
|
||||
(void)kv_pop(rpc->call_stack);
|
||||
|
||||
if (frame.errored) {
|
||||
if (frame.result.type == kObjectTypeString) {
|
||||
@@ -260,7 +205,7 @@ Object channel_send_call(uint64_t id,
|
||||
api_free_object(frame.result);
|
||||
}
|
||||
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
|
||||
return frame.errored ? NIL : frame.result;
|
||||
}
|
||||
@@ -269,11 +214,11 @@ Object channel_send_call(uint64_t id,
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void channel_subscribe(uint64_t id, char *event)
|
||||
void rpc_subscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
abort();
|
||||
}
|
||||
|
||||
@@ -284,18 +229,18 @@ void channel_subscribe(uint64_t id, char *event)
|
||||
pmap_put(cstr_t)(event_strings, event_string, event_string);
|
||||
}
|
||||
|
||||
pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
|
||||
pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
|
||||
}
|
||||
|
||||
/// Unsubscribes to event broadcasts
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void channel_unsubscribe(uint64_t id, char *event)
|
||||
void rpc_unsubscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
abort();
|
||||
}
|
||||
|
||||
@@ -310,7 +255,7 @@ bool channel_close(uint64_t id)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -322,24 +267,22 @@ bool channel_close(uint64_t id)
|
||||
/// Neovim
|
||||
void channel_from_stdio(void)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
|
||||
incref(channel); // stdio channels are only closed on exit
|
||||
Channel *channel = channel_alloc(kChannelStreamStdio);
|
||||
channel_incref(channel); // stdio channels are only closed on exit
|
||||
// read stream
|
||||
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.std.in, receive_msgpack, channel);
|
||||
// write stream
|
||||
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
|
||||
rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE);
|
||||
wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0);
|
||||
|
||||
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
|
||||
&channel->data.std.in, &channel->data.std.out);
|
||||
rpc_start(channel);
|
||||
}
|
||||
|
||||
/// Creates a loopback channel. This is used to avoid deadlock
|
||||
/// when an instance connects to its own named pipe.
|
||||
uint64_t channel_create_internal(void)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeInternal, 0, NULL);
|
||||
incref(channel); // internal channel lives until process exit
|
||||
Channel *channel = channel_alloc(kChannelStreamInternal);
|
||||
channel_incref(channel); // internal channel lives until process exit
|
||||
rpc_start(channel);
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
@@ -347,8 +290,8 @@ void channel_process_exit(uint64_t id, int status)
|
||||
{
|
||||
Channel *channel = pmap_get(uint64_t)(channels, id);
|
||||
|
||||
channel->closed = true;
|
||||
decref(channel);
|
||||
// channel_decref(channel); remove??
|
||||
channel->rpc.closed = true;
|
||||
}
|
||||
|
||||
// rstream.c:read_event() invokes this as stream->read_cb().
|
||||
@@ -356,7 +299,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
|
||||
void *data, bool eof)
|
||||
{
|
||||
Channel *channel = data;
|
||||
incref(channel);
|
||||
channel_incref(channel);
|
||||
|
||||
if (eof) {
|
||||
close_channel(channel);
|
||||
@@ -367,30 +310,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
|
||||
goto end;
|
||||
}
|
||||
|
||||
if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
|
||||
|| (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
|
||||
char buf[256];
|
||||
snprintf(buf, sizeof(buf),
|
||||
"ch %" PRIu64 ": stream closed unexpectedly. "
|
||||
"closing channel",
|
||||
channel->id);
|
||||
call_set_error(channel, buf, WARN_LOG_LEVEL);
|
||||
goto end;
|
||||
}
|
||||
|
||||
size_t count = rbuffer_size(rbuf);
|
||||
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
|
||||
DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
|
||||
channel->id, count, stream);
|
||||
|
||||
// Feed the unpacker with data
|
||||
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
|
||||
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
|
||||
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
|
||||
msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
|
||||
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
|
||||
msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
|
||||
|
||||
parse_msgpack(channel);
|
||||
|
||||
end:
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
}
|
||||
|
||||
static void parse_msgpack(Channel *channel)
|
||||
@@ -400,8 +332,8 @@ static void parse_msgpack(Channel *channel)
|
||||
msgpack_unpack_return result;
|
||||
|
||||
// Deserialize everything we can.
|
||||
while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
|
||||
MSGPACK_UNPACK_SUCCESS) {
|
||||
while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
|
||||
MSGPACK_UNPACK_SUCCESS) {
|
||||
bool is_response = is_rpc_response(&unpacked.data);
|
||||
log_client_msg(channel->id, !is_response, unpacked.data);
|
||||
|
||||
@@ -427,7 +359,7 @@ static void parse_msgpack(Channel *channel)
|
||||
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
|
||||
mch_errmsg(e_outofmem);
|
||||
mch_errmsg("\n");
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
preserve_exit();
|
||||
}
|
||||
|
||||
@@ -492,7 +424,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
|
||||
evdata->handler = handler;
|
||||
evdata->args = args;
|
||||
evdata->request_id = request_id;
|
||||
incref(channel);
|
||||
channel_incref(channel);
|
||||
if (handler.async) {
|
||||
bool is_get_mode = handler.fn == handle_nvim_get_mode;
|
||||
|
||||
@@ -530,66 +462,30 @@ static void on_request_event(void **argv)
|
||||
api_free_object(result);
|
||||
}
|
||||
api_free_array(args);
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
xfree(e);
|
||||
api_clear_error(&error);
|
||||
}
|
||||
|
||||
/// Returns the Stream that a Channel writes to.
|
||||
static Stream *chan_wstream(Channel *chan)
|
||||
{
|
||||
switch (chan->type) {
|
||||
case kChannelTypeSocket:
|
||||
return &chan->data.stream;
|
||||
case kChannelTypeProc:
|
||||
return chan->data.proc->in;
|
||||
case kChannelTypeStdio:
|
||||
return &chan->data.std.out;
|
||||
case kChannelTypeInternal:
|
||||
return NULL;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
/// Returns the Stream that a Channel reads from.
|
||||
static Stream *chan_rstream(Channel *chan)
|
||||
{
|
||||
switch (chan->type) {
|
||||
case kChannelTypeSocket:
|
||||
return &chan->data.stream;
|
||||
case kChannelTypeProc:
|
||||
return chan->data.proc->out;
|
||||
case kChannelTypeStdio:
|
||||
return &chan->data.std.in;
|
||||
case kChannelTypeInternal:
|
||||
return NULL;
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
|
||||
static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
{
|
||||
bool success = false;
|
||||
bool success;
|
||||
|
||||
if (channel->closed) {
|
||||
if (channel->rpc.closed) {
|
||||
wstream_release_wbuffer(buffer);
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (channel->type) {
|
||||
case kChannelTypeSocket:
|
||||
case kChannelTypeProc:
|
||||
case kChannelTypeStdio:
|
||||
success = wstream_write(chan_wstream(channel), buffer);
|
||||
break;
|
||||
case kChannelTypeInternal:
|
||||
incref(channel);
|
||||
CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
|
||||
success = true;
|
||||
break;
|
||||
if (channel->streamtype == kChannelStreamInternal) {
|
||||
channel_incref(channel);
|
||||
CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
|
||||
success = true;
|
||||
} else {
|
||||
Stream *in = channel_instream(channel);
|
||||
success = wstream_write(in, buffer);
|
||||
}
|
||||
|
||||
|
||||
if (!success) {
|
||||
// If the write failed for any reason, close the channel
|
||||
char buf[256];
|
||||
@@ -609,14 +505,14 @@ static void internal_read_event(void **argv)
|
||||
Channel *channel = argv[0];
|
||||
WBuffer *buffer = argv[1];
|
||||
|
||||
msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
|
||||
memcpy(msgpack_unpacker_buffer(channel->unpacker),
|
||||
msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
|
||||
memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
|
||||
buffer->data, buffer->size);
|
||||
msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
|
||||
msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
|
||||
|
||||
parse_msgpack(channel);
|
||||
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
wstream_release_wbuffer(buffer);
|
||||
}
|
||||
|
||||
@@ -665,7 +561,8 @@ static void broadcast_event(const char *name, Array args)
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
|
||||
if (channel->is_rpc
|
||||
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
|
||||
kv_push(subscribed, channel);
|
||||
}
|
||||
});
|
||||
@@ -695,10 +592,11 @@ end:
|
||||
static void unsubscribe(Channel *channel, char *event)
|
||||
{
|
||||
char *event_string = pmap_get(cstr_t)(event_strings, event);
|
||||
pmap_del(cstr_t)(channel->subscribed_events, event_string);
|
||||
pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
|
||||
if (channel->is_rpc
|
||||
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
|
||||
return;
|
||||
}
|
||||
});
|
||||
@@ -709,86 +607,65 @@ static void unsubscribe(Channel *channel, char *event)
|
||||
}
|
||||
|
||||
/// Close the channel streams/process and free the channel resources.
|
||||
/// TODO: move to channel.h
|
||||
static void close_channel(Channel *channel)
|
||||
{
|
||||
if (channel->closed) {
|
||||
if (channel->rpc.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
channel->closed = true;
|
||||
channel->rpc.closed = true;
|
||||
|
||||
switch (channel->type) {
|
||||
case kChannelTypeSocket:
|
||||
stream_close(&channel->data.stream, NULL, NULL);
|
||||
switch (channel->streamtype) {
|
||||
case kChannelStreamSocket:
|
||||
stream_close(&channel->stream.socket, NULL, NULL);
|
||||
break;
|
||||
case kChannelTypeProc:
|
||||
case kChannelStreamProc:
|
||||
// Only close the rpc channel part,
|
||||
// there could be an error message on the stderr stream
|
||||
process_close_in(channel->data.proc);
|
||||
process_close_out(channel->data.proc);
|
||||
process_close_in(&channel->stream.proc);
|
||||
process_close_out(&channel->stream.proc);
|
||||
break;
|
||||
case kChannelTypeStdio:
|
||||
stream_close(&channel->data.std.in, NULL, NULL);
|
||||
stream_close(&channel->data.std.out, NULL, NULL);
|
||||
case kChannelStreamStdio:
|
||||
stream_close(&channel->stream.stdio.in, NULL, NULL);
|
||||
stream_close(&channel->stream.stdio.out, NULL, NULL);
|
||||
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
|
||||
return;
|
||||
case kChannelTypeInternal:
|
||||
case kChannelStreamInternal:
|
||||
// nothing to free.
|
||||
break;
|
||||
}
|
||||
|
||||
decref(channel);
|
||||
channel_decref(channel);
|
||||
}
|
||||
|
||||
static void exit_event(void **argv)
|
||||
{
|
||||
decref(argv[0]);
|
||||
channel_decref(argv[0]);
|
||||
|
||||
if (!exiting) {
|
||||
mch_exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_channel(Channel *channel)
|
||||
void rpc_free(Channel *channel)
|
||||
{
|
||||
remote_ui_disconnect(channel->id);
|
||||
pmap_del(uint64_t)(channels, channel->id);
|
||||
msgpack_unpacker_free(channel->unpacker);
|
||||
msgpack_unpacker_free(channel->rpc.unpacker);
|
||||
|
||||
// Unsubscribe from all events
|
||||
char *event_string;
|
||||
map_foreach_value(channel->subscribed_events, event_string, {
|
||||
map_foreach_value(channel->rpc.subscribed_events, event_string, {
|
||||
unsubscribe(channel, event_string);
|
||||
});
|
||||
|
||||
pmap_free(cstr_t)(channel->subscribed_events);
|
||||
kv_destroy(channel->call_stack);
|
||||
if (channel->type != kChannelTypeProc) {
|
||||
multiqueue_free(channel->events);
|
||||
}
|
||||
xfree(channel);
|
||||
pmap_free(cstr_t)(channel->rpc.subscribed_events);
|
||||
kv_destroy(channel->rpc.call_stack);
|
||||
}
|
||||
|
||||
static void close_cb(Stream *stream, void *data)
|
||||
{
|
||||
decref(data);
|
||||
}
|
||||
|
||||
static Channel *register_channel(ChannelType type, uint64_t id,
|
||||
MultiQueue *events)
|
||||
{
|
||||
Channel *rv = xmalloc(sizeof(Channel));
|
||||
rv->events = events ? events : multiqueue_new_child(main_loop.events);
|
||||
rv->type = type;
|
||||
rv->refcount = 1;
|
||||
rv->closed = false;
|
||||
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||
rv->id = id > 0 ? id : next_chan_id++;
|
||||
rv->subscribed_events = pmap_new(cstr_t)();
|
||||
rv->next_request_id = 1;
|
||||
kv_init(rv->call_stack);
|
||||
pmap_put(uint64_t)(channels, rv->id, rv);
|
||||
return rv;
|
||||
channel_decref(data);
|
||||
}
|
||||
|
||||
static bool is_rpc_response(msgpack_object *obj)
|
||||
@@ -803,15 +680,18 @@ static bool is_rpc_response(msgpack_object *obj)
|
||||
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
uint64_t response_id = obj->via.array.ptr[1].via.u64;
|
||||
if (kv_size(channel->rpc.call_stack) == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Must be equal to the frame at the stack's bottom
|
||||
return kv_size(channel->call_stack) && response_id
|
||||
== kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id;
|
||||
ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
|
||||
return response_id == frame->request_id;
|
||||
}
|
||||
|
||||
static void complete_call(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
ChannelCallFrame *frame = kv_A(channel->call_stack,
|
||||
kv_size(channel->call_stack) - 1);
|
||||
ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
|
||||
frame->returned = true;
|
||||
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
|
||||
|
||||
@@ -825,8 +705,8 @@ static void complete_call(msgpack_object *obj, Channel *channel)
|
||||
static void call_set_error(Channel *channel, char *msg, int loglevel)
|
||||
{
|
||||
LOG(loglevel, "RPC: %s", msg);
|
||||
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
|
||||
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
|
||||
for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
|
||||
ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
|
||||
frame->returned = true;
|
||||
frame->errored = true;
|
||||
api_free_object(frame->result);
|
||||
@@ -875,18 +755,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
|
||||
return rv;
|
||||
}
|
||||
|
||||
static void incref(Channel *channel)
|
||||
{
|
||||
channel->refcount++;
|
||||
}
|
||||
|
||||
static void decref(Channel *channel)
|
||||
{
|
||||
if (!(--channel->refcount)) {
|
||||
free_channel(channel);
|
||||
}
|
||||
}
|
||||
|
||||
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
|
||||
#define REQ "[request] "
|
||||
#define RES "[response] "
|
||||
|
@@ -8,6 +8,7 @@
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/event/process.h"
|
||||
#include "nvim/vim.h"
|
||||
#include "nvim/channel.h"
|
||||
|
||||
#define METHOD_MAXLEN 512
|
||||
|
||||
@@ -16,6 +17,7 @@
|
||||
/// of os_inchar(), so they are processed "just-in-time".
|
||||
MultiQueue *ch_before_blocking_events;
|
||||
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/channel.h.generated.h"
|
||||
#endif
|
||||
|
36
src/nvim/msgpack_rpc/channel_defs.h
Normal file
36
src/nvim/msgpack_rpc/channel_defs.h
Normal file
@@ -0,0 +1,36 @@
|
||||
#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
|
||||
#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <uv.h>
|
||||
#include <msgpack.h>
|
||||
|
||||
#include "nvim/api/private/defs.h"
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/event/process.h"
|
||||
#include "nvim/vim.h"
|
||||
|
||||
typedef struct Channel Channel;
|
||||
|
||||
typedef struct {
|
||||
uint64_t request_id;
|
||||
bool returned, errored;
|
||||
Object result;
|
||||
} ChannelCallFrame;
|
||||
|
||||
typedef struct {
|
||||
Channel *channel;
|
||||
MsgpackRpcRequestHandler handler;
|
||||
Array args;
|
||||
uint64_t request_id;
|
||||
} RequestEvent;
|
||||
|
||||
typedef struct {
|
||||
PMap(cstr_t) *subscribed_events;
|
||||
bool closed;
|
||||
msgpack_unpacker *unpacker;
|
||||
uint64_t next_request_id;
|
||||
kvec_t(ChannelCallFrame *) call_stack;
|
||||
} RpcState;
|
||||
|
||||
#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
|
Reference in New Issue
Block a user