mirror of
https://github.com/neovim/neovim.git
synced 2025-10-08 10:56:31 +00:00
msgpack-rpc: Create subdirectory for msgpack-rpc modules
Create the msgpack_rpc subdirectory and move all modules that deal with msgpack-rpc to it. Also merge msgpack_rpc.c into msgpack_rpc/helpers.c
This commit is contained in:
597
src/nvim/msgpack_rpc/channel.c
Normal file
597
src/nvim/msgpack_rpc/channel.c
Normal file
@@ -0,0 +1,597 @@
|
||||
#include <stdbool.h>
|
||||
#include <string.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <uv.h>
|
||||
#include <msgpack.h>
|
||||
|
||||
#include "nvim/api/private/helpers.h"
|
||||
#include "nvim/api/vim.h"
|
||||
#include "nvim/msgpack_rpc/channel.h"
|
||||
#include "nvim/os/event.h"
|
||||
#include "nvim/os/rstream.h"
|
||||
#include "nvim/os/rstream_defs.h"
|
||||
#include "nvim/os/wstream.h"
|
||||
#include "nvim/os/wstream_defs.h"
|
||||
#include "nvim/os/job.h"
|
||||
#include "nvim/os/job_defs.h"
|
||||
#include "nvim/msgpack_rpc/helpers.h"
|
||||
#include "nvim/vim.h"
|
||||
#include "nvim/ascii.h"
|
||||
#include "nvim/memory.h"
|
||||
#include "nvim/os_unix.h"
|
||||
#include "nvim/message.h"
|
||||
#include "nvim/term.h"
|
||||
#include "nvim/map.h"
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/misc1.h"
|
||||
#include "nvim/lib/kvec.h"
|
||||
|
||||
#define CHANNEL_BUFFER_SIZE 0xffff
|
||||
|
||||
typedef struct {
|
||||
uint64_t request_id;
|
||||
bool errored;
|
||||
Object result;
|
||||
} ChannelCallFrame;
|
||||
|
||||
typedef struct {
|
||||
uint64_t id;
|
||||
PMap(cstr_t) *subscribed_events;
|
||||
bool is_job, enabled;
|
||||
msgpack_unpacker *unpacker;
|
||||
union {
|
||||
Job *job;
|
||||
struct {
|
||||
RStream *read;
|
||||
WStream *write;
|
||||
uv_stream_t *uv;
|
||||
} streams;
|
||||
} data;
|
||||
uint64_t next_request_id;
|
||||
kvec_t(ChannelCallFrame *) call_stack;
|
||||
size_t rpc_call_level;
|
||||
} Channel;
|
||||
|
||||
static uint64_t next_id = 1;
|
||||
static PMap(uint64_t) *channels = NULL;
|
||||
static PMap(cstr_t) *event_strings = NULL;
|
||||
static msgpack_sbuffer out_buffer;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/channel.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// Initializes the module
|
||||
void channel_init(void)
|
||||
{
|
||||
channels = pmap_new(uint64_t)();
|
||||
event_strings = pmap_new(cstr_t)();
|
||||
msgpack_sbuffer_init(&out_buffer);
|
||||
|
||||
if (embedded_mode) {
|
||||
channel_from_stdio();
|
||||
}
|
||||
}
|
||||
|
||||
/// Teardown the module
|
||||
void channel_teardown(void)
|
||||
{
|
||||
if (!channels) {
|
||||
return;
|
||||
}
|
||||
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
close_channel(channel);
|
||||
});
|
||||
}
|
||||
|
||||
/// Creates an API channel by starting a job and connecting to its
|
||||
/// stdin/stdout. stderr is forwarded to the editor error stream.
|
||||
///
|
||||
/// @param argv The argument vector for the process
|
||||
/// @return The channel id
|
||||
uint64_t channel_from_job(char **argv)
|
||||
{
|
||||
Channel *channel = register_channel();
|
||||
channel->is_job = true;
|
||||
|
||||
int status;
|
||||
channel->data.job = job_start(argv,
|
||||
channel,
|
||||
job_out,
|
||||
job_err,
|
||||
NULL,
|
||||
0,
|
||||
&status);
|
||||
|
||||
if (status <= 0) {
|
||||
close_channel(channel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
||||
/// Creates an API channel from a libuv stream representing a tcp or
|
||||
/// pipe/socket client connection
|
||||
///
|
||||
/// @param stream The established connection
|
||||
void channel_from_stream(uv_stream_t *stream)
|
||||
{
|
||||
Channel *channel = register_channel();
|
||||
stream->data = NULL;
|
||||
channel->is_job = false;
|
||||
// read stream
|
||||
channel->data.streams.read = rstream_new(parse_msgpack,
|
||||
rbuffer_new(CHANNEL_BUFFER_SIZE),
|
||||
channel,
|
||||
NULL);
|
||||
rstream_set_stream(channel->data.streams.read, stream);
|
||||
rstream_start(channel->data.streams.read);
|
||||
// write stream
|
||||
channel->data.streams.write = wstream_new(0);
|
||||
wstream_set_stream(channel->data.streams.write, stream);
|
||||
channel->data.streams.uv = stream;
|
||||
}
|
||||
|
||||
bool channel_exists(uint64_t id)
|
||||
{
|
||||
Channel *channel;
|
||||
return (channel = pmap_get(uint64_t)(channels, id)) != NULL
|
||||
&& channel->enabled;
|
||||
}
|
||||
|
||||
/// Sends event/arguments to channel
|
||||
///
|
||||
/// @param id The channel id. If 0, the event will be sent to all
|
||||
/// channels that have subscribed to the event type
|
||||
/// @param name The event name, an arbitrary string
|
||||
/// @param args Array with event arguments
|
||||
/// @return True if the event was sent successfully, false otherwise.
|
||||
bool channel_send_event(uint64_t id, char *name, Array args)
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (id > 0) {
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
api_free_array(args);
|
||||
return false;
|
||||
}
|
||||
send_event(channel, name, args);
|
||||
} else {
|
||||
broadcast_event(name, args);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Sends a method call to a channel
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param method_name The method name, an arbitrary string
|
||||
/// @param args Array with method arguments
|
||||
/// @param[out] error True if the return value is an error
|
||||
/// @return Whatever the remote method returned
|
||||
Object channel_send_call(uint64_t id,
|
||||
char *method_name,
|
||||
Array args,
|
||||
Error *err)
|
||||
{
|
||||
Channel *channel = NULL;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
|
||||
api_free_array(args);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
if (kv_size(channel->call_stack) > 20) {
|
||||
// 20 stack depth is more than anyone should ever need for RPC calls
|
||||
api_set_error(err,
|
||||
Exception,
|
||||
_("Channel %" PRIu64 " crossed maximum stack depth"),
|
||||
channel->id);
|
||||
api_free_array(args);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
uint64_t request_id = channel->next_request_id++;
|
||||
// Send the msgpack-rpc request
|
||||
send_request(channel, request_id, method_name, args);
|
||||
|
||||
EventSource channel_source = channel->is_job
|
||||
? job_event_source(channel->data.job)
|
||||
: rstream_event_source(channel->data.streams.read);
|
||||
EventSource sources[] = {channel_source, NULL};
|
||||
|
||||
// Push the frame
|
||||
ChannelCallFrame frame = {request_id, false, NIL};
|
||||
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
|
||||
size_t size = kv_size(channel->call_stack);
|
||||
|
||||
do {
|
||||
event_poll(-1, sources);
|
||||
} while (
|
||||
// Continue running if ...
|
||||
channel->enabled && // the channel is still enabled
|
||||
kv_size(channel->call_stack) >= size); // the call didn't return
|
||||
|
||||
if (frame.errored) {
|
||||
api_set_error(err, Exception, "%s", frame.result.data.string.data);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
return frame.result;
|
||||
}
|
||||
|
||||
/// Subscribes to event broadcasts
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void channel_subscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
abort();
|
||||
}
|
||||
|
||||
char *event_string = pmap_get(cstr_t)(event_strings, event);
|
||||
|
||||
if (!event_string) {
|
||||
event_string = xstrdup(event);
|
||||
pmap_put(cstr_t)(event_strings, event_string, event_string);
|
||||
}
|
||||
|
||||
pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
|
||||
}
|
||||
|
||||
/// Unsubscribes to event broadcasts
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void channel_unsubscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
abort();
|
||||
}
|
||||
|
||||
unsubscribe(channel, event);
|
||||
}
|
||||
|
||||
/// Closes a channel
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @return true if successful, false otherwise
|
||||
bool channel_close(uint64_t id)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
|
||||
return false;
|
||||
}
|
||||
|
||||
channel_kill(channel);
|
||||
channel->enabled = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Creates an API channel from stdin/stdout. This is used when embedding
|
||||
/// Neovim
|
||||
static void channel_from_stdio(void)
|
||||
{
|
||||
Channel *channel = register_channel();
|
||||
channel->is_job = false;
|
||||
// read stream
|
||||
channel->data.streams.read = rstream_new(parse_msgpack,
|
||||
rbuffer_new(CHANNEL_BUFFER_SIZE),
|
||||
channel,
|
||||
NULL);
|
||||
rstream_set_file(channel->data.streams.read, 0);
|
||||
rstream_start(channel->data.streams.read);
|
||||
// write stream
|
||||
channel->data.streams.write = wstream_new(0);
|
||||
wstream_set_file(channel->data.streams.write, 1);
|
||||
channel->data.streams.uv = NULL;
|
||||
}
|
||||
|
||||
static void job_out(RStream *rstream, void *data, bool eof)
|
||||
{
|
||||
Job *job = data;
|
||||
parse_msgpack(rstream, job_data(job), eof);
|
||||
}
|
||||
|
||||
static void job_err(RStream *rstream, void *data, bool eof)
|
||||
{
|
||||
size_t count;
|
||||
char buf[256];
|
||||
Channel *channel = job_data(data);
|
||||
|
||||
while ((count = rstream_pending(rstream))) {
|
||||
size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
|
||||
buf[read] = NUL;
|
||||
ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
|
||||
}
|
||||
}
|
||||
|
||||
static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
||||
{
|
||||
Channel *channel = data;
|
||||
channel->rpc_call_level++;
|
||||
|
||||
if (eof) {
|
||||
char buf[256];
|
||||
snprintf(buf,
|
||||
sizeof(buf),
|
||||
"Before returning from a RPC call, channel %" PRIu64 " was "
|
||||
"closed by the client",
|
||||
channel->id);
|
||||
call_set_error(channel, buf);
|
||||
goto end;
|
||||
}
|
||||
|
||||
uint32_t count = rstream_pending(rstream);
|
||||
DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
|
||||
count,
|
||||
rstream);
|
||||
|
||||
// Feed the unpacker with data
|
||||
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
|
||||
rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
|
||||
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
|
||||
|
||||
msgpack_unpacked unpacked;
|
||||
msgpack_unpacked_init(&unpacked);
|
||||
msgpack_unpack_return result;
|
||||
|
||||
// Deserialize everything we can.
|
||||
while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
|
||||
MSGPACK_UNPACK_SUCCESS) {
|
||||
if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
|
||||
if (is_valid_rpc_response(&unpacked.data, channel)) {
|
||||
call_stack_pop(&unpacked.data, channel);
|
||||
} else {
|
||||
char buf[256];
|
||||
snprintf(buf,
|
||||
sizeof(buf),
|
||||
"Channel %" PRIu64 " returned a response that doesn't have "
|
||||
" a matching id for the current RPC call. Ensure the client "
|
||||
" is properly synchronized",
|
||||
channel->id);
|
||||
call_set_error(channel, buf);
|
||||
}
|
||||
msgpack_unpacked_destroy(&unpacked);
|
||||
// Bail out from this event loop iteration
|
||||
goto end;
|
||||
}
|
||||
|
||||
// Perform the call
|
||||
WBuffer *resp = msgpack_rpc_call(channel->id, &unpacked.data, &out_buffer);
|
||||
// write the response
|
||||
if (!channel_write(channel, resp)) {
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
|
||||
OUT_STR(e_outofmem);
|
||||
out_char('\n');
|
||||
preserve_exit();
|
||||
}
|
||||
|
||||
if (result == MSGPACK_UNPACK_PARSE_ERROR) {
|
||||
// See src/msgpack/unpack_template.h in msgpack source tree for
|
||||
// causes for this error(search for 'goto _failed')
|
||||
//
|
||||
// A not so uncommon cause for this might be deserializing objects with
|
||||
// a high nesting level: msgpack will break when it's internal parse stack
|
||||
// size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
|
||||
send_error(channel, 0, "Invalid msgpack payload. "
|
||||
"This error can also happen when deserializing "
|
||||
"an object with high level of nesting");
|
||||
}
|
||||
|
||||
end:
|
||||
channel->rpc_call_level--;
|
||||
if (!channel->enabled && !kv_size(channel->call_stack)) {
|
||||
// Now it's safe to destroy the channel
|
||||
close_channel(channel);
|
||||
}
|
||||
}
|
||||
|
||||
static bool channel_write(Channel *channel, WBuffer *buffer)
|
||||
{
|
||||
bool success;
|
||||
|
||||
if (channel->is_job) {
|
||||
success = job_write(channel->data.job, buffer);
|
||||
} else {
|
||||
success = wstream_write(channel->data.streams.write, buffer);
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
// If the write failed for any reason, close the channel
|
||||
char buf[256];
|
||||
snprintf(buf,
|
||||
sizeof(buf),
|
||||
"Before returning from a RPC call, channel %" PRIu64 " was "
|
||||
"closed due to a failed write",
|
||||
channel->id);
|
||||
call_set_error(channel, buf);
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
static void send_error(Channel *channel, uint64_t id, char *err)
|
||||
{
|
||||
Error e = ERROR_INIT;
|
||||
api_set_error(&e, Exception, "%s", err);
|
||||
channel_write(channel, serialize_response(id, &e, NIL, &out_buffer));
|
||||
}
|
||||
|
||||
static void send_request(Channel *channel,
|
||||
uint64_t id,
|
||||
char *name,
|
||||
Array args)
|
||||
{
|
||||
String method = {.size = strlen(name), .data = name};
|
||||
channel_write(channel, serialize_request(id, method, args, &out_buffer, 1));
|
||||
}
|
||||
|
||||
static void send_event(Channel *channel,
|
||||
char *name,
|
||||
Array args)
|
||||
{
|
||||
String method = {.size = strlen(name), .data = name};
|
||||
channel_write(channel, serialize_request(0, method, args, &out_buffer, 1));
|
||||
}
|
||||
|
||||
static void broadcast_event(char *name, Array args)
|
||||
{
|
||||
kvec_t(Channel *) subscribed;
|
||||
kv_init(subscribed);
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
|
||||
kv_push(Channel *, subscribed, channel);
|
||||
}
|
||||
});
|
||||
|
||||
if (!kv_size(subscribed)) {
|
||||
api_free_array(args);
|
||||
goto end;
|
||||
}
|
||||
|
||||
String method = {.size = strlen(name), .data = name};
|
||||
WBuffer *buffer = serialize_request(0,
|
||||
method,
|
||||
args,
|
||||
&out_buffer,
|
||||
kv_size(subscribed));
|
||||
|
||||
for (size_t i = 0; i < kv_size(subscribed); i++) {
|
||||
channel_write(kv_A(subscribed, i), buffer);
|
||||
}
|
||||
|
||||
end:
|
||||
kv_destroy(subscribed);
|
||||
}
|
||||
|
||||
static void unsubscribe(Channel *channel, char *event)
|
||||
{
|
||||
char *event_string = pmap_get(cstr_t)(event_strings, event);
|
||||
pmap_del(cstr_t)(channel->subscribed_events, event_string);
|
||||
|
||||
map_foreach_value(channels, channel, {
|
||||
if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
// Since the string is no longer used by other channels, release it's memory
|
||||
pmap_del(cstr_t)(event_strings, event_string);
|
||||
free(event_string);
|
||||
}
|
||||
|
||||
static void close_channel(Channel *channel)
|
||||
{
|
||||
pmap_del(uint64_t)(channels, channel->id);
|
||||
msgpack_unpacker_free(channel->unpacker);
|
||||
|
||||
// Unsubscribe from all events
|
||||
char *event_string;
|
||||
map_foreach_value(channel->subscribed_events, event_string, {
|
||||
unsubscribe(channel, event_string);
|
||||
});
|
||||
|
||||
pmap_free(cstr_t)(channel->subscribed_events);
|
||||
kv_destroy(channel->call_stack);
|
||||
channel_kill(channel);
|
||||
|
||||
free(channel);
|
||||
}
|
||||
|
||||
static void channel_kill(Channel *channel)
|
||||
{
|
||||
if (channel->is_job) {
|
||||
if (channel->data.job) {
|
||||
job_stop(channel->data.job);
|
||||
}
|
||||
} else {
|
||||
rstream_free(channel->data.streams.read);
|
||||
wstream_free(channel->data.streams.write);
|
||||
if (channel->data.streams.uv) {
|
||||
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
|
||||
} else {
|
||||
// When the stdin channel closes, it's time to go
|
||||
mch_exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void close_cb(uv_handle_t *handle)
|
||||
{
|
||||
free(handle->data);
|
||||
free(handle);
|
||||
}
|
||||
|
||||
static Channel *register_channel(void)
|
||||
{
|
||||
Channel *rv = xmalloc(sizeof(Channel));
|
||||
rv->enabled = true;
|
||||
rv->rpc_call_level = 0;
|
||||
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||
rv->id = next_id++;
|
||||
rv->subscribed_events = pmap_new(cstr_t)();
|
||||
rv->next_request_id = 1;
|
||||
kv_init(rv->call_stack);
|
||||
pmap_put(uint64_t)(channels, rv->id, rv);
|
||||
return rv;
|
||||
}
|
||||
|
||||
static bool is_rpc_response(msgpack_object *obj)
|
||||
{
|
||||
return obj->type == MSGPACK_OBJECT_ARRAY
|
||||
&& obj->via.array.size == 4
|
||||
&& obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
|
||||
&& obj->via.array.ptr[0].via.u64 == 1
|
||||
&& obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
|
||||
}
|
||||
|
||||
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
uint64_t response_id = obj->via.array.ptr[1].via.u64;
|
||||
// Must be equal to the frame at the stack's bottom
|
||||
return response_id == kv_A(channel->call_stack,
|
||||
kv_size(channel->call_stack) - 1)->request_id;
|
||||
}
|
||||
|
||||
static void call_stack_pop(msgpack_object *obj, Channel *channel)
|
||||
{
|
||||
ChannelCallFrame *frame = kv_pop(channel->call_stack);
|
||||
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
|
||||
|
||||
if (frame->errored) {
|
||||
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
|
||||
} else {
|
||||
msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
|
||||
}
|
||||
}
|
||||
|
||||
static void call_set_error(Channel *channel, char *msg)
|
||||
{
|
||||
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
|
||||
ChannelCallFrame *frame = kv_pop(channel->call_stack);
|
||||
frame->errored = true;
|
||||
frame->result = STRING_OBJ(cstr_to_string(msg));
|
||||
}
|
||||
|
||||
channel->enabled = false;
|
||||
}
|
15
src/nvim/msgpack_rpc/channel.h
Normal file
15
src/nvim/msgpack_rpc/channel.h
Normal file
@@ -0,0 +1,15 @@
|
||||
#ifndef NVIM_MSGPACK_RPC_CHANNEL_H
|
||||
#define NVIM_MSGPACK_RPC_CHANNEL_H
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <uv.h>
|
||||
|
||||
#include "nvim/api/private/defs.h"
|
||||
#include "nvim/vim.h"
|
||||
|
||||
#define METHOD_MAXLEN 512
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/channel.h.generated.h"
|
||||
#endif
|
||||
#endif // NVIM_MSGPACK_RPC_CHANNEL_H
|
34
src/nvim/msgpack_rpc/defs.h
Normal file
34
src/nvim/msgpack_rpc/defs.h
Normal file
@@ -0,0 +1,34 @@
|
||||
#ifndef NVIM_MSGPACK_RPC_DEFS_H
|
||||
#define NVIM_MSGPACK_RPC_DEFS_H
|
||||
|
||||
#include <msgpack.h>
|
||||
|
||||
|
||||
/// The rpc_method_handlers table, used in msgpack_rpc_dispatch(), stores
|
||||
/// functions of this type.
|
||||
typedef Object (*rpc_method_handler_fn)(uint64_t channel_id,
|
||||
msgpack_object *req,
|
||||
Error *error);
|
||||
|
||||
/// Initializes the msgpack-rpc method table
|
||||
void msgpack_rpc_init_method_table(void);
|
||||
|
||||
void msgpack_rpc_init_function_metadata(Dictionary *metadata);
|
||||
|
||||
/// Dispatches to the actual API function after basic payload validation by
|
||||
/// `msgpack_rpc_call`. It is responsible for validating/converting arguments
|
||||
/// to C types, and converting the return value back to msgpack types.
|
||||
/// The implementation is generated at compile time with metadata extracted
|
||||
/// from the api/*.h headers,
|
||||
///
|
||||
/// @param channel_id The channel id
|
||||
/// @param method_id The method id
|
||||
/// @param req The parsed request object
|
||||
/// @param error Pointer to error structure
|
||||
/// @return Some object
|
||||
Object msgpack_rpc_dispatch(uint64_t channel_id,
|
||||
msgpack_object *req,
|
||||
Error *error)
|
||||
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(3);
|
||||
|
||||
#endif // NVIM_MSGPACK_RPC_DEFS_H
|
463
src/nvim/msgpack_rpc/helpers.c
Normal file
463
src/nvim/msgpack_rpc/helpers.c
Normal file
@@ -0,0 +1,463 @@
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
#include <msgpack.h>
|
||||
|
||||
#include "nvim/api/private/helpers.h"
|
||||
#include "nvim/msgpack_rpc/helpers.h"
|
||||
#include "nvim/msgpack_rpc/defs.h"
|
||||
#include "nvim/vim.h"
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/memory.h"
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/helpers.c.generated.h"
|
||||
#endif
|
||||
|
||||
static msgpack_zone zone;
|
||||
static msgpack_sbuffer sbuffer;
|
||||
|
||||
#define HANDLE_TYPE_CONVERSION_IMPL(t, lt) \
|
||||
bool msgpack_rpc_to_##lt(msgpack_object *obj, t *arg) \
|
||||
FUNC_ATTR_NONNULL_ALL \
|
||||
{ \
|
||||
if (obj->type != MSGPACK_OBJECT_EXT \
|
||||
|| obj->via.ext.type != kObjectType##t) { \
|
||||
return false; \
|
||||
} \
|
||||
\
|
||||
msgpack_object data; \
|
||||
msgpack_unpack_return ret = msgpack_unpack(obj->via.ext.ptr, \
|
||||
obj->via.ext.size, \
|
||||
NULL, \
|
||||
&zone, \
|
||||
&data); \
|
||||
\
|
||||
if (ret != MSGPACK_UNPACK_SUCCESS) { \
|
||||
return false; \
|
||||
} \
|
||||
\
|
||||
*arg = data.via.u64; \
|
||||
return true; \
|
||||
} \
|
||||
\
|
||||
void msgpack_rpc_from_##lt(t o, msgpack_packer *res) \
|
||||
FUNC_ATTR_NONNULL_ARG(2) \
|
||||
{ \
|
||||
msgpack_packer pac; \
|
||||
msgpack_packer_init(&pac, &sbuffer, msgpack_sbuffer_write); \
|
||||
msgpack_pack_uint64(&pac, o); \
|
||||
msgpack_pack_ext(res, sbuffer.size, kObjectType##t); \
|
||||
msgpack_pack_ext_body(res, sbuffer.data, sbuffer.size); \
|
||||
msgpack_sbuffer_clear(&sbuffer); \
|
||||
}
|
||||
|
||||
void msgpack_rpc_helpers_init(void)
|
||||
{
|
||||
msgpack_zone_init(&zone, 0xfff);
|
||||
msgpack_sbuffer_init(&sbuffer);
|
||||
}
|
||||
|
||||
HANDLE_TYPE_CONVERSION_IMPL(Buffer, buffer)
|
||||
HANDLE_TYPE_CONVERSION_IMPL(Window, window)
|
||||
HANDLE_TYPE_CONVERSION_IMPL(Tabpage, tabpage)
|
||||
|
||||
bool msgpack_rpc_to_boolean(msgpack_object *obj, Boolean *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
*arg = obj->via.boolean;
|
||||
return obj->type == MSGPACK_OBJECT_BOOLEAN;
|
||||
}
|
||||
|
||||
bool msgpack_rpc_to_integer(msgpack_object *obj, Integer *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER
|
||||
&& obj->via.u64 <= INT64_MAX) {
|
||||
*arg = (int64_t)obj->via.u64;
|
||||
return true;
|
||||
}
|
||||
|
||||
*arg = obj->via.i64;
|
||||
return obj->type == MSGPACK_OBJECT_NEGATIVE_INTEGER;
|
||||
}
|
||||
|
||||
bool msgpack_rpc_to_float(msgpack_object *obj, Float *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
*arg = obj->via.dec;
|
||||
return obj->type == MSGPACK_OBJECT_DOUBLE;
|
||||
}
|
||||
|
||||
bool msgpack_rpc_to_string(msgpack_object *obj, String *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (obj->type == MSGPACK_OBJECT_BIN || obj->type == MSGPACK_OBJECT_STR) {
|
||||
arg->data = xmemdupz(obj->via.bin.ptr, obj->via.bin.size);
|
||||
arg->size = obj->via.bin.size;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool msgpack_rpc_to_object(msgpack_object *obj, Object *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
switch (obj->type) {
|
||||
case MSGPACK_OBJECT_NIL:
|
||||
arg->type = kObjectTypeNil;
|
||||
return true;
|
||||
|
||||
case MSGPACK_OBJECT_BOOLEAN:
|
||||
arg->type = kObjectTypeBoolean;
|
||||
return msgpack_rpc_to_boolean(obj, &arg->data.boolean);
|
||||
|
||||
case MSGPACK_OBJECT_POSITIVE_INTEGER:
|
||||
case MSGPACK_OBJECT_NEGATIVE_INTEGER:
|
||||
arg->type = kObjectTypeInteger;
|
||||
return msgpack_rpc_to_integer(obj, &arg->data.integer);
|
||||
|
||||
case MSGPACK_OBJECT_DOUBLE:
|
||||
arg->type = kObjectTypeFloat;
|
||||
return msgpack_rpc_to_float(obj, &arg->data.floating);
|
||||
|
||||
case MSGPACK_OBJECT_BIN:
|
||||
case MSGPACK_OBJECT_STR:
|
||||
arg->type = kObjectTypeString;
|
||||
return msgpack_rpc_to_string(obj, &arg->data.string);
|
||||
|
||||
case MSGPACK_OBJECT_ARRAY:
|
||||
arg->type = kObjectTypeArray;
|
||||
return msgpack_rpc_to_array(obj, &arg->data.array);
|
||||
|
||||
case MSGPACK_OBJECT_MAP:
|
||||
arg->type = kObjectTypeDictionary;
|
||||
return msgpack_rpc_to_dictionary(obj, &arg->data.dictionary);
|
||||
|
||||
case MSGPACK_OBJECT_EXT:
|
||||
switch (obj->via.ext.type) {
|
||||
case kObjectTypeBuffer:
|
||||
return msgpack_rpc_to_buffer(obj, &arg->data.buffer);
|
||||
case kObjectTypeWindow:
|
||||
return msgpack_rpc_to_window(obj, &arg->data.window);
|
||||
case kObjectTypeTabpage:
|
||||
return msgpack_rpc_to_tabpage(obj, &arg->data.tabpage);
|
||||
}
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool msgpack_rpc_to_array(msgpack_object *obj, Array *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (obj->type != MSGPACK_OBJECT_ARRAY) {
|
||||
return false;
|
||||
}
|
||||
|
||||
arg->size = obj->via.array.size;
|
||||
arg->items = xcalloc(obj->via.array.size, sizeof(Object));
|
||||
|
||||
for (uint32_t i = 0; i < obj->via.array.size; i++) {
|
||||
if (!msgpack_rpc_to_object(obj->via.array.ptr + i, &arg->items[i])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool msgpack_rpc_to_dictionary(msgpack_object *obj, Dictionary *arg)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
if (obj->type != MSGPACK_OBJECT_MAP) {
|
||||
return false;
|
||||
}
|
||||
|
||||
arg->size = obj->via.array.size;
|
||||
arg->items = xcalloc(obj->via.map.size, sizeof(KeyValuePair));
|
||||
|
||||
|
||||
for (uint32_t i = 0; i < obj->via.map.size; i++) {
|
||||
if (!msgpack_rpc_to_string(&obj->via.map.ptr[i].key,
|
||||
&arg->items[i].key)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!msgpack_rpc_to_object(&obj->via.map.ptr[i].val,
|
||||
&arg->items[i].value)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_boolean(Boolean result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
if (result) {
|
||||
msgpack_pack_true(res);
|
||||
} else {
|
||||
msgpack_pack_false(res);
|
||||
}
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_integer(Integer result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
msgpack_pack_int64(res, result);
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_float(Float result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
msgpack_pack_double(res, result);
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_string(String result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
msgpack_pack_bin(res, result.size);
|
||||
msgpack_pack_bin_body(res, result.data, result.size);
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_object(Object result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
switch (result.type) {
|
||||
case kObjectTypeNil:
|
||||
msgpack_pack_nil(res);
|
||||
break;
|
||||
|
||||
case kObjectTypeBoolean:
|
||||
msgpack_rpc_from_boolean(result.data.boolean, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeInteger:
|
||||
msgpack_rpc_from_integer(result.data.integer, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeFloat:
|
||||
msgpack_rpc_from_float(result.data.floating, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeString:
|
||||
msgpack_rpc_from_string(result.data.string, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeArray:
|
||||
msgpack_rpc_from_array(result.data.array, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeBuffer:
|
||||
msgpack_rpc_from_buffer(result.data.buffer, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeWindow:
|
||||
msgpack_rpc_from_window(result.data.window, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeTabpage:
|
||||
msgpack_rpc_from_tabpage(result.data.tabpage, res);
|
||||
break;
|
||||
|
||||
case kObjectTypeDictionary:
|
||||
msgpack_rpc_from_dictionary(result.data.dictionary, res);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_array(Array result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
msgpack_pack_array(res, result.size);
|
||||
|
||||
for (size_t i = 0; i < result.size; i++) {
|
||||
msgpack_rpc_from_object(result.items[i], res);
|
||||
}
|
||||
}
|
||||
|
||||
void msgpack_rpc_from_dictionary(Dictionary result, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
msgpack_pack_map(res, result.size);
|
||||
|
||||
for (size_t i = 0; i < result.size; i++) {
|
||||
msgpack_rpc_from_string(result.items[i].key, res);
|
||||
msgpack_rpc_from_object(result.items[i].value, res);
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates the basic structure of the msgpack-rpc call and fills `res`
|
||||
/// with the basic response structure.
|
||||
///
|
||||
/// @param channel_id The channel id
|
||||
/// @param req The parsed request object
|
||||
/// @param res A packer that contains the response
|
||||
WBuffer *msgpack_rpc_call(uint64_t channel_id,
|
||||
msgpack_object *req,
|
||||
msgpack_sbuffer *sbuffer)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
FUNC_ATTR_NONNULL_ARG(3)
|
||||
{
|
||||
uint64_t response_id;
|
||||
Error error = ERROR_INIT;
|
||||
msgpack_rpc_validate(&response_id, req, &error);
|
||||
|
||||
if (error.set) {
|
||||
return serialize_response(response_id, &error, NIL, sbuffer);
|
||||
}
|
||||
|
||||
// dispatch the call
|
||||
Object rv = msgpack_rpc_dispatch(channel_id, req, &error);
|
||||
// send the response
|
||||
msgpack_packer response;
|
||||
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
|
||||
|
||||
if (error.set) {
|
||||
ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
|
||||
error.msg,
|
||||
response_id);
|
||||
return serialize_response(response_id, &error, NIL, sbuffer);
|
||||
}
|
||||
|
||||
DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
|
||||
response_id);
|
||||
return serialize_response(response_id, &error, rv, sbuffer);
|
||||
}
|
||||
|
||||
/// Finishes the msgpack-rpc call with an error message.
|
||||
///
|
||||
/// @param msg The error message
|
||||
/// @param res A packer that contains the response
|
||||
void msgpack_rpc_error(char *msg, msgpack_packer *res)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
size_t len = strlen(msg);
|
||||
|
||||
// error message
|
||||
msgpack_pack_bin(res, len);
|
||||
msgpack_pack_bin_body(res, msg, len);
|
||||
// Nil result
|
||||
msgpack_pack_nil(res);
|
||||
}
|
||||
|
||||
/// Handler executed when an invalid method name is passed
|
||||
Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
|
||||
msgpack_object *req,
|
||||
Error *error)
|
||||
{
|
||||
snprintf(error->msg, sizeof(error->msg), "Invalid method name");
|
||||
error->set = true;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/// Serializes a msgpack-rpc request or notification(id == 0)
|
||||
WBuffer *serialize_request(uint64_t request_id,
|
||||
String method,
|
||||
Array args,
|
||||
msgpack_sbuffer *sbuffer,
|
||||
size_t refcount)
|
||||
FUNC_ATTR_NONNULL_ARG(4)
|
||||
{
|
||||
msgpack_packer pac;
|
||||
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
|
||||
msgpack_pack_array(&pac, request_id ? 4 : 3);
|
||||
msgpack_pack_int(&pac, request_id ? 0 : 2);
|
||||
|
||||
if (request_id) {
|
||||
msgpack_pack_uint64(&pac, request_id);
|
||||
}
|
||||
|
||||
msgpack_pack_bin(&pac, method.size);
|
||||
msgpack_pack_bin_body(&pac, method.data, method.size);
|
||||
msgpack_rpc_from_array(args, &pac);
|
||||
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
||||
sbuffer->size,
|
||||
refcount,
|
||||
free);
|
||||
api_free_array(args);
|
||||
msgpack_sbuffer_clear(sbuffer);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// Serializes a msgpack-rpc response
|
||||
WBuffer *serialize_response(uint64_t response_id,
|
||||
Error *err,
|
||||
Object arg,
|
||||
msgpack_sbuffer *sbuffer)
|
||||
FUNC_ATTR_NONNULL_ARG(2, 4)
|
||||
{
|
||||
msgpack_packer pac;
|
||||
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
|
||||
msgpack_pack_array(&pac, 4);
|
||||
msgpack_pack_int(&pac, 1);
|
||||
msgpack_pack_uint64(&pac, response_id);
|
||||
|
||||
if (err->set) {
|
||||
// error represented by a [type, message] array
|
||||
msgpack_pack_array(&pac, 2);
|
||||
msgpack_rpc_from_integer(err->type, &pac);
|
||||
msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
|
||||
// Nil result
|
||||
msgpack_pack_nil(&pac);
|
||||
} else {
|
||||
// Nil error
|
||||
msgpack_pack_nil(&pac);
|
||||
// Return value
|
||||
msgpack_rpc_from_object(arg, &pac);
|
||||
}
|
||||
|
||||
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
||||
sbuffer->size,
|
||||
1, // responses only go though 1 channel
|
||||
free);
|
||||
api_free_object(arg);
|
||||
msgpack_sbuffer_clear(sbuffer);
|
||||
return rv;
|
||||
}
|
||||
|
||||
void msgpack_rpc_validate(uint64_t *response_id,
|
||||
msgpack_object *req,
|
||||
Error *err)
|
||||
{
|
||||
// response id not known yet
|
||||
|
||||
*response_id = 0;
|
||||
// Validate the basic structure of the msgpack-rpc payload
|
||||
if (req->type != MSGPACK_OBJECT_ARRAY) {
|
||||
api_set_error(err, Validation, _("Request is not an array"));
|
||||
}
|
||||
|
||||
if (req->via.array.size != 4) {
|
||||
api_set_error(err, Validation, _("Request array size should be 4"));
|
||||
}
|
||||
|
||||
if (req->via.array.ptr[1].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
|
||||
api_set_error(err, Validation, _("Id must be a positive integer"));
|
||||
}
|
||||
|
||||
// Set the response id, which is the same as the request
|
||||
*response_id = req->via.array.ptr[1].via.u64;
|
||||
|
||||
if (req->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
|
||||
api_set_error(err, Validation, _("Message type must be an integer"));
|
||||
}
|
||||
|
||||
if (req->via.array.ptr[0].via.u64 != 0) {
|
||||
api_set_error(err, Validation, _("Message type must be 0"));
|
||||
}
|
||||
|
||||
if (req->via.array.ptr[2].type != MSGPACK_OBJECT_BIN
|
||||
&& req->via.array.ptr[2].type != MSGPACK_OBJECT_STR) {
|
||||
api_set_error(err, Validation, _("Method must be a string"));
|
||||
}
|
||||
|
||||
if (req->via.array.ptr[3].type != MSGPACK_OBJECT_ARRAY) {
|
||||
api_set_error(err, Validation, _("Paremeters must be an array"));
|
||||
}
|
||||
}
|
17
src/nvim/msgpack_rpc/helpers.h
Normal file
17
src/nvim/msgpack_rpc/helpers.h
Normal file
@@ -0,0 +1,17 @@
|
||||
#ifndef NVIM_MSGPACK_RPC_HELPERS_H
|
||||
#define NVIM_MSGPACK_RPC_HELPERS_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
#include <msgpack.h>
|
||||
|
||||
#include "nvim/os/wstream.h"
|
||||
#include "nvim/api/private/defs.h"
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/helpers.h.generated.h"
|
||||
#endif
|
||||
|
||||
#endif // NVIM_MSGPACK_RPC_HELPERS_H
|
||||
|
273
src/nvim/msgpack_rpc/server.c
Normal file
273
src/nvim/msgpack_rpc/server.c
Normal file
@@ -0,0 +1,273 @@
|
||||
#include <assert.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <uv.h>
|
||||
|
||||
#include "nvim/msgpack_rpc/channel.h"
|
||||
#include "nvim/msgpack_rpc/server.h"
|
||||
#include "nvim/os/os.h"
|
||||
#include "nvim/ascii.h"
|
||||
#include "nvim/vim.h"
|
||||
#include "nvim/memory.h"
|
||||
#include "nvim/message.h"
|
||||
#include "nvim/tempfile.h"
|
||||
#include "nvim/map.h"
|
||||
#include "nvim/path.h"
|
||||
|
||||
#define MAX_CONNECTIONS 32
|
||||
#define ADDRESS_MAX_SIZE 256
|
||||
#define NEOVIM_DEFAULT_TCP_PORT 7450
|
||||
#define LISTEN_ADDRESS_ENV_VAR "NVIM_LISTEN_ADDRESS"
|
||||
|
||||
typedef enum {
|
||||
kServerTypeTcp,
|
||||
kServerTypePipe
|
||||
} ServerType;
|
||||
|
||||
typedef struct {
|
||||
// Type of the union below
|
||||
ServerType type;
|
||||
|
||||
// This is either a tcp server or unix socket(named pipe on windows)
|
||||
union {
|
||||
struct {
|
||||
uv_tcp_t handle;
|
||||
struct sockaddr_in addr;
|
||||
} tcp;
|
||||
struct {
|
||||
uv_pipe_t handle;
|
||||
char addr[ADDRESS_MAX_SIZE];
|
||||
} pipe;
|
||||
} socket;
|
||||
} Server;
|
||||
|
||||
static PMap(cstr_t) *servers = NULL;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/server.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// Initializes the module
|
||||
bool server_init(void)
|
||||
{
|
||||
servers = pmap_new(cstr_t)();
|
||||
|
||||
if (!os_getenv(LISTEN_ADDRESS_ENV_VAR)) {
|
||||
char *listen_address = (char *)vim_tempname();
|
||||
os_setenv(LISTEN_ADDRESS_ENV_VAR, listen_address, 1);
|
||||
free(listen_address);
|
||||
}
|
||||
|
||||
return server_start((char *)os_getenv(LISTEN_ADDRESS_ENV_VAR)) == 0;
|
||||
}
|
||||
|
||||
/// Teardown the server module
|
||||
void server_teardown(void)
|
||||
{
|
||||
if (!servers) {
|
||||
return;
|
||||
}
|
||||
|
||||
Server *server;
|
||||
|
||||
map_foreach_value(servers, server, {
|
||||
if (server->type == kServerTypeTcp) {
|
||||
uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
|
||||
} else {
|
||||
uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Starts listening on arbitrary tcp/unix addresses specified by
|
||||
/// `endpoint` for API calls. The type of socket used(tcp or unix/pipe) will
|
||||
/// be determined by parsing `endpoint`: If it's a valid tcp address in the
|
||||
/// 'ip[:port]' format, then it will be tcp socket. The port is optional
|
||||
/// and if omitted will default to NEOVIM_DEFAULT_TCP_PORT. Otherwise it will
|
||||
/// be a unix socket or named pipe.
|
||||
///
|
||||
/// @param endpoint Address of the server. Either a 'ip[:port]' string or an
|
||||
/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or
|
||||
/// named pipe.
|
||||
/// @returns zero if successful, one on a regular error, and negative errno
|
||||
/// on failure to bind or connect.
|
||||
int server_start(const char *endpoint)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
{
|
||||
char addr[ADDRESS_MAX_SIZE];
|
||||
|
||||
// Trim to `ADDRESS_MAX_SIZE`
|
||||
if (xstrlcpy(addr, endpoint, sizeof(addr)) >= sizeof(addr)) {
|
||||
// TODO(aktau): since this is not what the user wanted, perhaps we
|
||||
// should return an error here
|
||||
EMSG2("Address was too long, truncated to %s", addr);
|
||||
}
|
||||
|
||||
// Check if the server already exists
|
||||
if (pmap_has(cstr_t)(servers, addr)) {
|
||||
EMSG2("Already listening on %s", addr);
|
||||
return 1;
|
||||
}
|
||||
|
||||
ServerType server_type = kServerTypeTcp;
|
||||
Server *server = xmalloc(sizeof(Server));
|
||||
char ip[16], *ip_end = strrchr(addr, ':');
|
||||
|
||||
if (!ip_end) {
|
||||
ip_end = strchr(addr, NUL);
|
||||
}
|
||||
|
||||
uint32_t addr_len = ip_end - addr;
|
||||
|
||||
if (addr_len > sizeof(ip) - 1) {
|
||||
// Maximum length of an IP address buffer is 15(eg: 255.255.255.255)
|
||||
addr_len = sizeof(ip) - 1;
|
||||
}
|
||||
|
||||
// Extract the address part
|
||||
xstrlcpy(ip, addr, addr_len + 1);
|
||||
|
||||
int port = NEOVIM_DEFAULT_TCP_PORT;
|
||||
|
||||
if (*ip_end == ':') {
|
||||
// Extract the port
|
||||
long lport = strtol(ip_end + 1, NULL, 10); // NOLINT
|
||||
if (lport <= 0 || lport > 0xffff) {
|
||||
// Invalid port, treat as named pipe or unix socket
|
||||
server_type = kServerTypePipe;
|
||||
} else {
|
||||
port = (int) lport;
|
||||
}
|
||||
}
|
||||
|
||||
if (server_type == kServerTypeTcp) {
|
||||
// Try to parse ip address
|
||||
if (uv_ip4_addr(ip, port, &server->socket.tcp.addr)) {
|
||||
// Invalid address, treat as named pipe or unix socket
|
||||
server_type = kServerTypePipe;
|
||||
}
|
||||
}
|
||||
|
||||
int result;
|
||||
|
||||
if (server_type == kServerTypeTcp) {
|
||||
// Listen on tcp address/port
|
||||
uv_tcp_init(uv_default_loop(), &server->socket.tcp.handle);
|
||||
server->socket.tcp.handle.data = server;
|
||||
result = uv_tcp_bind(&server->socket.tcp.handle,
|
||||
(const struct sockaddr *)&server->socket.tcp.addr,
|
||||
0);
|
||||
if (result == 0) {
|
||||
result = uv_listen((uv_stream_t *)&server->socket.tcp.handle,
|
||||
MAX_CONNECTIONS,
|
||||
connection_cb);
|
||||
if (result) {
|
||||
uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Listen on named pipe or unix socket
|
||||
xstrlcpy(server->socket.pipe.addr, addr, sizeof(server->socket.pipe.addr));
|
||||
uv_pipe_init(uv_default_loop(), &server->socket.pipe.handle, 0);
|
||||
server->socket.pipe.handle.data = server;
|
||||
result = uv_pipe_bind(&server->socket.pipe.handle,
|
||||
server->socket.pipe.addr);
|
||||
if (result == 0) {
|
||||
result = uv_listen((uv_stream_t *)&server->socket.pipe.handle,
|
||||
MAX_CONNECTIONS,
|
||||
connection_cb);
|
||||
|
||||
if (result) {
|
||||
uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert(result <= 0); // libuv should have returned -errno or zero.
|
||||
if (result < 0) {
|
||||
if (result == -EACCES) {
|
||||
// Libuv converts ENOENT to EACCES for Windows compatibility, but if
|
||||
// the parent directory does not exist, ENOENT would be more accurate.
|
||||
*path_tail((char_u *) addr) = NUL;
|
||||
if (!os_file_exists((char_u *) addr)) {
|
||||
result = -ENOENT;
|
||||
}
|
||||
}
|
||||
EMSG2("Failed to start server: %s", uv_strerror(result));
|
||||
free(server);
|
||||
return result;
|
||||
}
|
||||
|
||||
server->type = server_type;
|
||||
|
||||
// Add the server to the hash table
|
||||
pmap_put(cstr_t)(servers, addr, server);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// Stops listening on the address specified by `endpoint`.
|
||||
///
|
||||
/// @param endpoint Address of the server.
|
||||
void server_stop(char *endpoint)
|
||||
{
|
||||
Server *server;
|
||||
char addr[ADDRESS_MAX_SIZE];
|
||||
|
||||
// Trim to `ADDRESS_MAX_SIZE`
|
||||
xstrlcpy(addr, endpoint, sizeof(addr));
|
||||
|
||||
if ((server = pmap_get(cstr_t)(servers, addr)) == NULL) {
|
||||
EMSG2("Not listening on %s", addr);
|
||||
return;
|
||||
}
|
||||
|
||||
if (server->type == kServerTypeTcp) {
|
||||
uv_close((uv_handle_t *)&server->socket.tcp.handle, free_server);
|
||||
} else {
|
||||
uv_close((uv_handle_t *)&server->socket.pipe.handle, free_server);
|
||||
}
|
||||
|
||||
pmap_del(cstr_t)(servers, addr);
|
||||
}
|
||||
|
||||
static void connection_cb(uv_stream_t *server, int status)
|
||||
{
|
||||
int result;
|
||||
uv_stream_t *client;
|
||||
Server *srv = server->data;
|
||||
|
||||
if (status < 0) {
|
||||
abort();
|
||||
}
|
||||
|
||||
if (srv->type == kServerTypeTcp) {
|
||||
client = xmalloc(sizeof(uv_tcp_t));
|
||||
uv_tcp_init(uv_default_loop(), (uv_tcp_t *)client);
|
||||
} else {
|
||||
client = xmalloc(sizeof(uv_pipe_t));
|
||||
uv_pipe_init(uv_default_loop(), (uv_pipe_t *)client, 0);
|
||||
}
|
||||
|
||||
result = uv_accept(server, client);
|
||||
|
||||
if (result) {
|
||||
EMSG2("Failed to accept connection: %s", uv_strerror(result));
|
||||
uv_close((uv_handle_t *)client, free_client);
|
||||
return;
|
||||
}
|
||||
|
||||
channel_from_stream(client);
|
||||
}
|
||||
|
||||
static void free_client(uv_handle_t *handle)
|
||||
{
|
||||
free(handle);
|
||||
}
|
||||
|
||||
static void free_server(uv_handle_t *handle)
|
||||
{
|
||||
free(handle->data);
|
||||
}
|
7
src/nvim/msgpack_rpc/server.h
Normal file
7
src/nvim/msgpack_rpc/server.h
Normal file
@@ -0,0 +1,7 @@
|
||||
#ifndef NVIM_MSGPACK_RPC_SERVER_H
|
||||
#define NVIM_MSGPACK_RPC_SERVER_H
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/server.h.generated.h"
|
||||
#endif
|
||||
#endif // NVIM_MSGPACK_RPC_SERVER_H
|
Reference in New Issue
Block a user