rstream/wstream: Unify structures and simplify API

- Simplify RStream/WStream API and make it more consistent with libuv.
- Move into the event loop layer(event subdirectory)
- Remove uv_helpers module.
- Simplify job/process internal modules/API.
- Unify RStream and WStream into a single structure. This is necessary because
  libuv streams can be readable and writable at the same time(and because the
  uv_helpers.c hack to associate multiple streams with libuv handle was removed)
- Make struct definition public, allowing more flexible/simple memory
  management by users of the module.
- Adapt channel/job modules to cope with the changes.
This commit is contained in:
Thiago de Arruda
2015-07-16 23:10:15 -03:00
parent 991d3ec1e6
commit ac2bd02561
33 changed files with 765 additions and 991 deletions

View File

@@ -10,10 +10,8 @@
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/remote_ui.h"
#include "nvim/event/loop.h"
#include "nvim/os/rstream.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/wstream.h"
#include "nvim/os/wstream_defs.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/os/job.h"
#include "nvim/os/job_defs.h"
#include "nvim/msgpack_rpc/helpers.h"
@@ -34,6 +32,12 @@
#define log_server_msg(...)
#endif
typedef enum {
kChannelTypeSocket,
kChannelTypeJob,
kChannelTypeStdio
} ChannelType;
typedef struct {
uint64_t request_id;
bool returned, errored;
@@ -45,15 +49,16 @@ typedef struct {
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
bool is_job, closed;
bool closed;
ChannelType type;
msgpack_unpacker *unpacker;
union {
Job *job;
Stream stream;
struct {
RStream *read;
WStream *write;
uv_stream_t *uv;
} streams;
Stream in;
Stream out;
} std;
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
@@ -112,8 +117,7 @@ void channel_teardown(void)
/// 0, on error.
uint64_t channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
Channel *channel = register_channel(kChannelTypeJob);
incref(channel); // job channels are only closed by the exit_cb
int status;
@@ -140,21 +144,15 @@ uint64_t channel_from_job(char **argv)
/// pipe/socket client connection
///
/// @param stream The established connection
void channel_from_stream(uv_stream_t *stream)
void channel_from_stream(uv_stream_t *uvstream)
{
Channel *channel = register_channel();
stream->data = NULL;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
channel);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
Channel *channel = register_channel(kChannelTypeSocket);
stream_init(NULL, &channel->data.stream, -1, uvstream, channel);
// write stream
channel->data.streams.write = wstream_new(0);
wstream_set_stream(channel->data.streams.write, stream);
channel->data.streams.uv = stream;
wstream_init(&channel->data.stream, 0);
// read stream
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, parse_msgpack);
}
/// Sends event/arguments to channel
@@ -313,28 +311,23 @@ bool channel_close(uint64_t id)
/// Neovim
static void channel_from_stdio(void)
{
Channel *channel = register_channel();
Channel *channel = register_channel(kChannelTypeStdio);
incref(channel); // stdio channels are only closed on exit
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
channel);
rstream_set_file(channel->data.streams.read, 0);
rstream_start(channel->data.streams.read);
rstream_init_fd(&loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
channel);
rstream_start(&channel->data.std.in, parse_msgpack);
// write stream
channel->data.streams.write = wstream_new(0);
wstream_set_file(channel->data.streams.write, 1);
channel->data.streams.uv = NULL;
wstream_init_fd(&loop, &channel->data.std.out, 1, 0, NULL);
}
static void job_out(RStream *rstream, RBuffer *buf, void *data, bool eof)
static void job_out(Stream *stream, RBuffer *buf, void *data, bool eof)
{
Job *job = data;
parse_msgpack(rstream, buf, job_data(job), eof);
parse_msgpack(stream, buf, job_data(job), eof);
}
static void job_err(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
static void job_err(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
while (rbuffer_size(rbuf)) {
char buf[256];
@@ -350,7 +343,7 @@ static void job_exit(Job *job, int status, void *data)
decref(data);
}
static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
static void parse_msgpack(Stream *stream, RBuffer *rbuf, void *data, bool eof)
{
Channel *channel = data;
incref(channel);
@@ -362,9 +355,9 @@ static void parse_msgpack(RStream *rstream, RBuffer *rbuf, void *data, bool eof)
}
size_t count = rbuffer_size(rbuf);
DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
DLOG("Feeding the msgpack parser with %u bytes of data from Stream(%p)",
count,
rstream);
stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@@ -516,10 +509,18 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
return false;
}
if (channel->is_job) {
success = job_write(channel->data.job, buffer);
} else {
success = wstream_write(channel->data.streams.write, buffer);
switch (channel->type) {
case kChannelTypeSocket:
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeJob:
success = job_write(channel->data.job, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
break;
default:
abort();
}
if (!success) {
@@ -637,20 +638,23 @@ static void close_channel(Channel *channel)
channel->closed = true;
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
}
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv;
if (handle) {
uv_close(handle, close_cb);
} else {
switch (channel->type) {
case kChannelTypeSocket:
stream_close(&channel->data.stream, close_cb);
break;
case kChannelTypeJob:
if (channel->data.job) {
job_stop(channel->data.job);
}
break;
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL);
stream_close(&channel->data.std.out, NULL);
loop_push_event(&loop,
(Event) { .handler = on_stdio_close, .data = channel }, false);
}
break;
default:
abort();
}
decref(channel);
@@ -683,15 +687,15 @@ static void free_channel(Channel *channel)
xfree(channel);
}
static void close_cb(uv_handle_t *handle)
static void close_cb(Stream *stream, void *data)
{
xfree(handle->data);
xfree(handle);
xfree(data);
}
static Channel *register_channel(void)
static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);

View File

@@ -6,7 +6,7 @@
#include <msgpack.h>
#include "nvim/os/wstream.h"
#include "nvim/event/wstream.h"
#include "nvim/api/private/defs.h"
#ifdef INCLUDE_GENERATED_DECLARATIONS