Refactor: Remove support for multiple protocols

This removes the boilerplate code supporting more than one RPC protocol as it
was becoming hard to maintain and we probably won't ever need it.
This commit is contained in:
Thiago de Arruda
2014-05-27 10:16:40 -03:00
parent 277554a9eb
commit b8e563f516
5 changed files with 35 additions and 107 deletions

View File

@@ -5,7 +5,6 @@
#include "nvim/api/private/helpers.h" #include "nvim/api/private/helpers.h"
#include "nvim/os/channel.h" #include "nvim/os/channel.h"
#include "nvim/os/channel_defs.h"
#include "nvim/os/rstream.h" #include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h" #include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h" #include "nvim/os/wstream.h"
@@ -19,14 +18,9 @@
typedef struct { typedef struct {
uint64_t id; uint64_t id;
ChannelProtocol protocol;
bool is_job; bool is_job;
union { msgpack_unpacker *unpacker;
struct { msgpack_sbuffer *sbuffer;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
} msgpack;
} proto;
union { union {
int job_id; int job_id;
struct { struct {
@@ -44,7 +38,6 @@ static msgpack_sbuffer msgpack_event_buffer;
static void on_job_stdout(RStream *rstream, void *data, bool eof); static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof); static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof); static void parse_msgpack(RStream *rstream, void *data, bool eof);
static void send_msgpack(Channel *channel, String type, Object data);
static void close_channel(Channel *channel); static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle); static void close_cb(uv_handle_t *handle);
@@ -67,48 +60,28 @@ void channel_teardown()
}); });
} }
void channel_from_job(char **argv, ChannelProtocol prot) void channel_from_job(char **argv)
{ {
Channel *channel = xmalloc(sizeof(Channel)); Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = NULL; rstream_cb rcb = on_job_stdout;
channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
switch (prot) { channel->sbuffer = msgpack_sbuffer_new();
case kChannelProtocolMsgpack:
rcb = on_job_stdout;
channel->proto.msgpack.unpacker =
msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
break;
default:
abort();
}
channel->id = next_id++; channel->id = next_id++;
channel->protocol = prot;
channel->is_job = true; channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
map_put(uint64_t)(channels, channel->id, channel); map_put(uint64_t)(channels, channel->id, channel);
} }
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) void channel_from_stream(uv_stream_t *stream)
{ {
Channel *channel = xmalloc(sizeof(Channel)); Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = NULL; rstream_cb rcb = parse_msgpack;
channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
switch (prot) { channel->sbuffer = msgpack_sbuffer_new();
case kChannelProtocolMsgpack:
rcb = parse_msgpack;
channel->proto.msgpack.unpacker =
msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
break;
default:
abort();
}
stream->data = NULL; stream->data = NULL;
channel->id = next_id++; channel->id = next_id++;
channel->protocol = prot;
channel->is_job = false; channel->is_job = false;
// read stream // read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true); channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
@@ -131,16 +104,18 @@ bool channel_send_event(uint64_t id, char *type, typval_T *data)
String event_type = {.size = strnlen(type, 1024), .data = type}; String event_type = {.size = strnlen(type, 1024), .data = type};
Object event_data = vim_to_object(data); Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(event_type, event_data, &packer);
char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
switch (channel->protocol) { wstream_write(channel->data.streams.write,
case kChannelProtocolMsgpack: bytes,
send_msgpack(channel, event_type, event_data); msgpack_event_buffer.size,
break; true);
default:
abort();
}
msgpack_rpc_free_object(event_data); msgpack_rpc_free_object(event_data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return true; return true;
} }
@@ -168,62 +143,35 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
uint32_t count = rstream_available(rstream); uint32_t count = rstream_available(rstream);
// Feed the unpacker with data // Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count); msgpack_unpacker_reserve_buffer(channel->unpacker, count);
rstream_read(rstream, rstream_read(rstream, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer(channel->proto.msgpack.unpacker), msgpack_unpacker_buffer_consumed(channel->unpacker, count);
count);
msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
msgpack_unpacked unpacked; msgpack_unpacked unpacked;
msgpack_unpacked_init(&unpacked); msgpack_unpacked_init(&unpacked);
// Deserialize everything we can. // Deserialize everything we can.
while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) { while (msgpack_unpacker_next(channel->unpacker, &unpacked)) {
// Each object is a new msgpack-rpc request and requires an empty response // Each object is a new msgpack-rpc request and requires an empty response
msgpack_packer response; msgpack_packer response;
msgpack_packer_init(&response, msgpack_packer_init(&response, channel->sbuffer, msgpack_sbuffer_write);
channel->proto.msgpack.sbuffer,
msgpack_sbuffer_write);
// Perform the call // Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response); msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write, wstream_write(channel->data.streams.write,
xmemdup(channel->proto.msgpack.sbuffer->data, xmemdup(channel->sbuffer->data, channel->sbuffer->size),
channel->proto.msgpack.sbuffer->size), channel->sbuffer->size,
channel->proto.msgpack.sbuffer->size,
true); true);
// Clear the buffer for future calls // Clear the buffer for future calls
msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer); msgpack_sbuffer_clear(channel->sbuffer);
} }
} }
static void send_msgpack(Channel *channel, String type, Object data)
{
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(type, data, &packer);
char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
wstream_write(channel->data.streams.write,
bytes,
msgpack_event_buffer.size,
true);
msgpack_sbuffer_clear(&msgpack_event_buffer);
}
static void close_channel(Channel *channel) static void close_channel(Channel *channel)
{ {
map_del(uint64_t)(channels, channel->id); map_del(uint64_t)(channels, channel->id);
msgpack_sbuffer_free(channel->sbuffer);
switch (channel->protocol) { msgpack_unpacker_free(channel->unpacker);
case kChannelProtocolMsgpack:
msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
msgpack_unpacker_free(channel->proto.msgpack.unpacker);
break;
default:
abort();
}
if (channel->is_job) { if (channel->is_job) {
job_stop(channel->data.job_id); job_stop(channel->data.job_id);

View File

@@ -4,7 +4,6 @@
#include <uv.h> #include <uv.h>
#include "nvim/vim.h" #include "nvim/vim.h"
#include "nvim/os/channel_defs.h"
/// Initializes the module /// Initializes the module
void channel_init(void); void channel_init(void);
@@ -16,15 +15,13 @@ void channel_teardown(void);
/// pipe/socket client connection /// pipe/socket client connection
/// ///
/// @param stream The established connection /// @param stream The established connection
/// @param prot The rpc protocol used void channel_from_stream(uv_stream_t *stream);
void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot);
/// Creates an API channel by starting a job and connecting to its /// Creates an API channel by starting a job and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream. /// stdin/stdout. stderr is forwarded to the editor error stream.
/// ///
/// @param argv The argument vector for the process /// @param argv The argument vector for the process
/// @param prot The rpc protocol used void channel_from_job(char **argv);
void channel_from_job(char **argv, ChannelProtocol prot);
/// Sends event/data to channel /// Sends event/data to channel
/// ///

View File

@@ -1,8 +0,0 @@
#ifndef NVIM_OS_CHANNEL_DEFS_H
#define NVIM_OS_CHANNEL_DEFS_H
typedef enum {
kChannelProtocolMsgpack
} ChannelProtocol;
#endif // NVIM_OS_CHANNEL_DEFS_H

View File

@@ -5,7 +5,6 @@
#include <uv.h> #include <uv.h>
#include "nvim/os/channel_defs.h"
#include "nvim/os/channel.h" #include "nvim/os/channel.h"
#include "nvim/os/server.h" #include "nvim/os/server.h"
#include "nvim/os/os.h" #include "nvim/os/os.h"
@@ -25,8 +24,6 @@ typedef enum {
} ServerType; } ServerType;
typedef struct { typedef struct {
// Protocol for channels established through this server
ChannelProtocol protocol;
// Type of the union below // Type of the union below
ServerType type; ServerType type;
@@ -59,8 +56,7 @@ void server_init()
free(listen_address); free(listen_address);
} }
server_start((char *)os_getenv("NEOVIM_LISTEN_ADDRESS"), server_start((char *)os_getenv("NEOVIM_LISTEN_ADDRESS"));
kChannelProtocolMsgpack);
} }
void server_teardown() void server_teardown()
@@ -80,7 +76,7 @@ void server_teardown()
}); });
} }
void server_start(char *endpoint, ChannelProtocol prot) void server_start(char *endpoint)
{ {
char addr[ADDRESS_MAX_SIZE]; char addr[ADDRESS_MAX_SIZE];
@@ -101,8 +97,6 @@ void server_start(char *endpoint, ChannelProtocol prot)
Server *server = xmalloc(sizeof(Server)); Server *server = xmalloc(sizeof(Server));
char ip[16], *ip_end = strrchr(addr, ':'); char ip[16], *ip_end = strrchr(addr, ':');
server->protocol = prot;
if (!ip_end) { if (!ip_end) {
ip_end = strchr(addr, NUL); ip_end = strchr(addr, NUL);
} }
@@ -229,7 +223,7 @@ static void connection_cb(uv_stream_t *server, int status)
return; return;
} }
channel_from_stream(client, srv->protocol); channel_from_stream(client);
} }
static void free_client(uv_handle_t *handle) static void free_client(uv_handle_t *handle)

View File

@@ -1,8 +1,6 @@
#ifndef NVIM_OS_SERVER_H #ifndef NVIM_OS_SERVER_H
#define NVIM_OS_SERVER_H #define NVIM_OS_SERVER_H
#include "nvim/os/channel_defs.h"
/// Initializes the module /// Initializes the module
void server_init(); void server_init();
@@ -18,8 +16,7 @@ void server_teardown();
/// @param endpoint Address of the server. Either a 'ip:port' string or an /// @param endpoint Address of the server. Either a 'ip:port' string or an
/// arbitrary identifier(trimmed to 256 bytes) for the unix socket or /// arbitrary identifier(trimmed to 256 bytes) for the unix socket or
/// named pipe. /// named pipe.
/// @param prot The rpc protocol to be used void server_start(char *endpoint);
void server_start(char *endpoint, ChannelProtocol prot);
/// Stops listening on the address specified by `endpoint`. /// Stops listening on the address specified by `endpoint`.
/// ///