mirror of
https://github.com/neovim/neovim.git
synced 2025-09-24 20:18:32 +00:00
channel: Implement the 'channel_send_call' function
This function is used to send RPC calls to clients. In contrast to `channel_send_event`, this function will block until the client sends a response(But it will continue processing requests from that client). The RPC call stack has a maximum depth of 20.
This commit is contained in:
@@ -341,7 +341,7 @@ String cstr_to_string(const char *str)
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool object_to_vim(Object obj, typval_T *tv, Error *err)
|
bool object_to_vim(Object obj, typval_T *tv, Error *err)
|
||||||
{
|
{
|
||||||
tv->v_type = VAR_UNKNOWN;
|
tv->v_type = VAR_UNKNOWN;
|
||||||
tv->v_lock = 0;
|
tv->v_lock = 0;
|
||||||
|
@@ -71,6 +71,7 @@
|
|||||||
#include "nvim/os/time.h"
|
#include "nvim/os/time.h"
|
||||||
#include "nvim/os/channel.h"
|
#include "nvim/os/channel.h"
|
||||||
#include "nvim/api/private/helpers.h"
|
#include "nvim/api/private/helpers.h"
|
||||||
|
#include "nvim/os/msgpack_rpc.h"
|
||||||
|
|
||||||
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
|
#define DICT_MAXNEST 100 /* maximum nesting of lists and dicts */
|
||||||
|
|
||||||
@@ -6453,6 +6454,7 @@ static struct fst {
|
|||||||
{"searchpair", 3, 7, f_searchpair},
|
{"searchpair", 3, 7, f_searchpair},
|
||||||
{"searchpairpos", 3, 7, f_searchpairpos},
|
{"searchpairpos", 3, 7, f_searchpairpos},
|
||||||
{"searchpos", 1, 4, f_searchpos},
|
{"searchpos", 1, 4, f_searchpos},
|
||||||
|
{"send_call", 3, 3, f_send_call},
|
||||||
{"send_event", 3, 3, f_send_event},
|
{"send_event", 3, 3, f_send_event},
|
||||||
{"setbufvar", 3, 3, f_setbufvar},
|
{"setbufvar", 3, 3, f_setbufvar},
|
||||||
{"setcmdpos", 1, 1, f_setcmdpos},
|
{"setcmdpos", 1, 1, f_setcmdpos},
|
||||||
@@ -12525,6 +12527,47 @@ do_searchpair (
|
|||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// "send_call()" function
|
||||||
|
static void f_send_call(typval_T *argvars, typval_T *rettv)
|
||||||
|
{
|
||||||
|
rettv->v_type = VAR_NUMBER;
|
||||||
|
rettv->vval.v_number = 0;
|
||||||
|
|
||||||
|
if (check_restricted() || check_secure()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) {
|
||||||
|
EMSG2(_(e_invarg2), "Channel id must be a positive integer");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argvars[1].v_type != VAR_STRING) {
|
||||||
|
EMSG2(_(e_invarg2), "Method name must be a string");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool errored;
|
||||||
|
Object result;
|
||||||
|
if (!channel_send_call((uint64_t)argvars[0].vval.v_number,
|
||||||
|
(char *)argvars[1].vval.v_string,
|
||||||
|
vim_to_object(&argvars[2]),
|
||||||
|
&result,
|
||||||
|
&errored)) {
|
||||||
|
EMSG2(_(e_invarg2), "Channel doesn't exist");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Error conversion_error = {.set = false};
|
||||||
|
if (errored || !object_to_vim(result, rettv, &conversion_error)) {
|
||||||
|
EMSG(errored ?
|
||||||
|
result.data.string.data :
|
||||||
|
_("Error converting the call result"));
|
||||||
|
}
|
||||||
|
|
||||||
|
msgpack_rpc_free_object(result);
|
||||||
|
}
|
||||||
|
|
||||||
// "send_event()" function
|
// "send_event()" function
|
||||||
static void f_send_event(typval_T *argvars, typval_T *rettv)
|
static void f_send_event(typval_T *argvars, typval_T *rettv)
|
||||||
{
|
{
|
||||||
|
@@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
#include "nvim/api/private/helpers.h"
|
#include "nvim/api/private/helpers.h"
|
||||||
#include "nvim/os/channel.h"
|
#include "nvim/os/channel.h"
|
||||||
|
#include "nvim/os/event.h"
|
||||||
#include "nvim/os/rstream.h"
|
#include "nvim/os/rstream.h"
|
||||||
#include "nvim/os/rstream_defs.h"
|
#include "nvim/os/rstream_defs.h"
|
||||||
#include "nvim/os/wstream.h"
|
#include "nvim/os/wstream.h"
|
||||||
@@ -18,10 +19,16 @@
|
|||||||
#include "nvim/map.h"
|
#include "nvim/map.h"
|
||||||
#include "nvim/lib/kvec.h"
|
#include "nvim/lib/kvec.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t request_id;
|
||||||
|
bool errored;
|
||||||
|
Object result;
|
||||||
|
} ChannelCallFrame;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
PMap(cstr_t) *subscribed_events;
|
PMap(cstr_t) *subscribed_events;
|
||||||
bool is_job, is_alive;
|
bool is_job, enabled;
|
||||||
msgpack_unpacker *unpacker;
|
msgpack_unpacker *unpacker;
|
||||||
msgpack_sbuffer *sbuffer;
|
msgpack_sbuffer *sbuffer;
|
||||||
union {
|
union {
|
||||||
@@ -32,6 +39,9 @@ typedef struct {
|
|||||||
uv_stream_t *uv;
|
uv_stream_t *uv;
|
||||||
} streams;
|
} streams;
|
||||||
} data;
|
} data;
|
||||||
|
uint64_t next_request_id;
|
||||||
|
kvec_t(ChannelCallFrame *) call_stack;
|
||||||
|
size_t rpc_call_level;
|
||||||
} Channel;
|
} Channel;
|
||||||
|
|
||||||
static uint64_t next_id = 1;
|
static uint64_t next_id = 1;
|
||||||
@@ -135,6 +145,78 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool channel_send_call(uint64_t id,
|
||||||
|
char *name,
|
||||||
|
Object arg,
|
||||||
|
Object *result,
|
||||||
|
bool *errored)
|
||||||
|
{
|
||||||
|
Channel *channel = NULL;
|
||||||
|
|
||||||
|
if (!(channel = pmap_get(uint64_t)(channels, id))) {
|
||||||
|
msgpack_rpc_free_object(arg);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (kv_size(channel->call_stack) > 20) {
|
||||||
|
// 20 stack depth is more than anyone should ever need for RPC calls
|
||||||
|
*errored = true;
|
||||||
|
char buf[256];
|
||||||
|
snprintf(buf,
|
||||||
|
sizeof(buf),
|
||||||
|
"Channel %" PRIu64 " was closed due to a high stack depth "
|
||||||
|
"while processing a RPC call",
|
||||||
|
channel->id);
|
||||||
|
*result = STRING_OBJ(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t request_id = channel->next_request_id++;
|
||||||
|
// Send the msgpack-rpc request
|
||||||
|
channel_write(channel, serialize_message(0, request_id, name, arg));
|
||||||
|
|
||||||
|
if (!kv_size(channel->call_stack)) {
|
||||||
|
// This is the first frame, we must disable event deferral for this
|
||||||
|
// channel because we won't be returning until the client sends a
|
||||||
|
// response
|
||||||
|
if (channel->is_job) {
|
||||||
|
job_set_defer(channel->data.job, false);
|
||||||
|
} else {
|
||||||
|
rstream_set_defer(channel->data.streams.read, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push the frame
|
||||||
|
ChannelCallFrame frame = {request_id, false, NIL};
|
||||||
|
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
|
||||||
|
size_t size = kv_size(channel->call_stack);
|
||||||
|
|
||||||
|
do {
|
||||||
|
event_poll(-1);
|
||||||
|
} while (
|
||||||
|
// Continue running if ...
|
||||||
|
channel->enabled && // the channel is still enabled
|
||||||
|
kv_size(channel->call_stack) >= size); // the call didn't return
|
||||||
|
|
||||||
|
if (!kv_size(channel->call_stack)) {
|
||||||
|
// Popped last frame, restore event deferral
|
||||||
|
if (channel->is_job) {
|
||||||
|
job_set_defer(channel->data.job, true);
|
||||||
|
} else {
|
||||||
|
rstream_set_defer(channel->data.streams.read, true);
|
||||||
|
}
|
||||||
|
if (!channel->enabled && !channel->rpc_call_level) {
|
||||||
|
// Close the channel if it has been disabled and we have not been called
|
||||||
|
// by `parse_msgpack`(It would be unsafe to close the channel otherwise)
|
||||||
|
close_channel(channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*errored = frame.errored;
|
||||||
|
*result = frame.result;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/// Subscribes to event broadcasts
|
/// Subscribes to event broadcasts
|
||||||
///
|
///
|
||||||
/// @param id The channel id
|
/// @param id The channel id
|
||||||
@@ -193,10 +275,17 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
|||||||
Channel *channel = data;
|
Channel *channel = data;
|
||||||
|
|
||||||
if (eof) {
|
if (eof) {
|
||||||
close_channel(channel);
|
char buf[256];
|
||||||
|
snprintf(buf,
|
||||||
|
sizeof(buf),
|
||||||
|
"Before returning from a RPC call, channel %" PRIu64 " was "
|
||||||
|
"closed by the client",
|
||||||
|
channel->id);
|
||||||
|
disable_channel(channel, buf);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
channel->rpc_call_level++;
|
||||||
uint32_t count = rstream_available(rstream);
|
uint32_t count = rstream_available(rstream);
|
||||||
|
|
||||||
// Feed the unpacker with data
|
// Feed the unpacker with data
|
||||||
@@ -211,6 +300,24 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
|||||||
// Deserialize everything we can.
|
// Deserialize everything we can.
|
||||||
while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
|
while ((result = msgpack_rpc_unpack(channel->unpacker, &unpacked))
|
||||||
== kUnpackResultOk) {
|
== kUnpackResultOk) {
|
||||||
|
if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
|
||||||
|
if (is_valid_rpc_response(&unpacked.data, channel)) {
|
||||||
|
call_stack_pop(&unpacked.data, channel);
|
||||||
|
} else {
|
||||||
|
char buf[256];
|
||||||
|
snprintf(buf,
|
||||||
|
sizeof(buf),
|
||||||
|
"Channel %" PRIu64 " returned a response that doesn't have "
|
||||||
|
" a matching id for the current RPC call. Ensure the client "
|
||||||
|
" is properly synchronized",
|
||||||
|
channel->id);
|
||||||
|
call_stack_unwind(channel, buf, 1);
|
||||||
|
}
|
||||||
|
msgpack_unpacked_destroy(&unpacked);
|
||||||
|
// Bail out from this event loop iteration
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
// Each object is a new msgpack-rpc request and requires an empty response
|
// Each object is a new msgpack-rpc request and requires an empty response
|
||||||
msgpack_packer response;
|
msgpack_packer response;
|
||||||
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
|
msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
|
||||||
@@ -221,7 +328,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
|||||||
channel->sbuffer->size,
|
channel->sbuffer->size,
|
||||||
free);
|
free);
|
||||||
if (!channel_write(channel, buffer)) {
|
if (!channel_write(channel, buffer)) {
|
||||||
return;
|
goto end;
|
||||||
}
|
}
|
||||||
// Clear the buffer for future calls
|
// Clear the buffer for future calls
|
||||||
msgpack_sbuffer_clear(channel->sbuffer);
|
msgpack_sbuffer_clear(channel->sbuffer);
|
||||||
@@ -238,6 +345,13 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
|||||||
"This error can also happen when deserializing "
|
"This error can also happen when deserializing "
|
||||||
"an object with high level of nesting");
|
"an object with high level of nesting");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
channel->rpc_call_level--;
|
||||||
|
if (!channel->enabled && !kv_size(channel->call_stack)) {
|
||||||
|
// Now it's safe to destroy the channel
|
||||||
|
close_channel(channel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void send_error(Channel *channel, char *msg)
|
static void send_error(Channel *channel, char *msg)
|
||||||
@@ -276,9 +390,14 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
// If the write failed for whatever reason, mark the channel as not alive so
|
// If the write failed for any reason, close the channel
|
||||||
// it can be freed later
|
char buf[256];
|
||||||
channel->is_alive = false;
|
snprintf(buf,
|
||||||
|
sizeof(buf),
|
||||||
|
"Before returning from a RPC call, channel %" PRIu64 " was "
|
||||||
|
"closed due to a failed write",
|
||||||
|
channel->id);
|
||||||
|
disable_channel(channel, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
return success;
|
return success;
|
||||||
@@ -359,6 +478,7 @@ static void close_channel(Channel *channel)
|
|||||||
});
|
});
|
||||||
|
|
||||||
pmap_free(cstr_t)(channel->subscribed_events);
|
pmap_free(cstr_t)(channel->subscribed_events);
|
||||||
|
kv_destroy(channel->call_stack);
|
||||||
free(channel);
|
free(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -390,12 +510,67 @@ static WBuffer *serialize_message(int type,
|
|||||||
static Channel *register_channel()
|
static Channel *register_channel()
|
||||||
{
|
{
|
||||||
Channel *rv = xmalloc(sizeof(Channel));
|
Channel *rv = xmalloc(sizeof(Channel));
|
||||||
rv->is_alive = true;
|
rv->enabled = true;
|
||||||
|
rv->rpc_call_level = 0;
|
||||||
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||||
rv->sbuffer = msgpack_sbuffer_new();
|
rv->sbuffer = msgpack_sbuffer_new();
|
||||||
rv->id = next_id++;
|
rv->id = next_id++;
|
||||||
rv->subscribed_events = pmap_new(cstr_t)();
|
rv->subscribed_events = pmap_new(cstr_t)();
|
||||||
|
rv->next_request_id = 1;
|
||||||
|
kv_init(rv->call_stack);
|
||||||
pmap_put(uint64_t)(channels, rv->id, rv);
|
pmap_put(uint64_t)(channels, rv->id, rv);
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool is_rpc_response(msgpack_object *obj)
|
||||||
|
{
|
||||||
|
return obj->type == MSGPACK_OBJECT_ARRAY
|
||||||
|
&& obj->via.array.size == 4
|
||||||
|
&& obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
|
||||||
|
&& obj->via.array.ptr[0].via.u64 == 1
|
||||||
|
&& obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
|
||||||
|
{
|
||||||
|
uint64_t response_id = obj->via.array.ptr[1].via.u64;
|
||||||
|
// Must be equal to the frame at the stack's bottom
|
||||||
|
return response_id == kv_A(channel->call_stack,
|
||||||
|
kv_size(channel->call_stack) - 1)->request_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void call_stack_pop(msgpack_object *obj, Channel *channel)
|
||||||
|
{
|
||||||
|
ChannelCallFrame *frame = kv_A(channel->call_stack,
|
||||||
|
kv_size(channel->call_stack) - 1);
|
||||||
|
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
|
||||||
|
(void)kv_pop(channel->call_stack);
|
||||||
|
|
||||||
|
if (frame->errored) {
|
||||||
|
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
|
||||||
|
} else {
|
||||||
|
msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void call_stack_unwind(Channel *channel, char *msg, int count)
|
||||||
|
{
|
||||||
|
while (kv_size(channel->call_stack) && count--) {
|
||||||
|
ChannelCallFrame *frame = kv_pop(channel->call_stack);
|
||||||
|
frame->errored = true;
|
||||||
|
frame->result = STRING_OBJ(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void disable_channel(Channel *channel, char *msg)
|
||||||
|
{
|
||||||
|
if (kv_size(channel->call_stack)) {
|
||||||
|
// Channel is currently in the middle of a call, remove all frames and mark
|
||||||
|
// it as "dead"
|
||||||
|
channel->enabled = false;
|
||||||
|
call_stack_unwind(channel, msg, -1);
|
||||||
|
} else {
|
||||||
|
// Safe to close it now
|
||||||
|
close_channel(channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user