Merge 'Refactor WStream to enable writing the same buffer to multiple targets'

This commit is contained in:
Thiago de Arruda
2014-05-28 09:05:13 -03:00
15 changed files with 357 additions and 87 deletions

View File

@@ -62,6 +62,12 @@ for i = 1, #arg - 1 do
api.functions[#api.functions + 1] = tmp[i]
local fn_id = #api.functions
local fn = api.functions[fn_id]
if #fn.parameters ~= 0 and fn.parameters[1][2] == 'channel_id' then
-- this function should receive the channel id
fn.receives_channel_id = true
-- remove the parameter since it won't be passed by the api client
table.remove(fn.parameters, 1)
end
if #fn.parameters ~= 0 and fn.parameters[#fn.parameters][1] == 'error' then
-- function can fail if the last parameter type is 'Error'
fn.can_fail = true
@@ -111,7 +117,7 @@ end
output:write([[
};
void msgpack_rpc_dispatch(uint64_t id, msgpack_object *req, msgpack_packer *res)
void msgpack_rpc_dispatch(uint64_t channel_id, msgpack_object *req, msgpack_packer *res)
{
Error error = { .set = false };
uint64_t method_id = (uint32_t)req->via.array.ptr[2].via.u64;
@@ -121,7 +127,7 @@ void msgpack_rpc_dispatch(uint64_t id, msgpack_object *req, msgpack_packer *res)
msgpack_pack_nil(res);
// The result is the [channel_id, metadata] array
msgpack_pack_array(res, 2);
msgpack_pack_uint64(res, id);
msgpack_pack_uint64(res, channel_id);
msgpack_pack_raw(res, sizeof(msgpack_metadata));
msgpack_pack_raw_body(res, msgpack_metadata, sizeof(msgpack_metadata));
return;
@@ -169,8 +175,19 @@ for i = 1, #api.functions do
output:write(fn.return_type..' rv = ')
end
-- write the call without the closing parenthesis
output:write(fn.name..'('..call_args)
-- write the function name and the opening parenthesis
output:write(fn.name..'(')
if fn.receives_channel_id then
-- if the function receives the channel id, pass it as first argument
if #args > 0 then
output:write('channel_id, '..call_args)
else
output:write('channel_id)')
end
else
output:write(call_args)
end
if fn.can_fail then
-- if the function can fail, also pass a pointer to the local error object

View File

@@ -28,9 +28,9 @@ send {
for group in split(groups)
exe 'augroup '.group
autocmd!
augroup NONE
exe 'augroup! '.group
augroup END
endfor
autocmd!
tabnew
let curbufnum = eval(bufnr('%'))
redir => buflist

View File

@@ -121,6 +121,7 @@ end:
}
free(rv.items);
rv.items = NULL;
}
return rv;

View File

@@ -7,6 +7,7 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/defs.h"
#include "nvim/api/buffer.h"
#include "nvim/os/channel.h"
#include "nvim/vim.h"
#include "nvim/buffer.h"
#include "nvim/window.h"
@@ -327,6 +328,24 @@ void vim_set_current_tabpage(Tabpage tabpage, Error *err)
try_end(err);
}
void vim_subscribe(uint64_t channel_id, String event)
{
size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN);
char e[EVENT_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_subscribe(channel_id, e);
}
void vim_unsubscribe(uint64_t channel_id, String event)
{
size_t length = (event.size < EVENT_MAXLEN ? event.size : EVENT_MAXLEN);
char e[EVENT_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_unsubscribe(channel_id, e);
}
static void write_msg(String message, bool to_err)
{
static int pos = 0;

View File

@@ -155,5 +155,17 @@ Tabpage vim_get_current_tabpage(void);
/// @param[out] err Details of an error that may have occurred
void vim_set_current_tabpage(Tabpage tabpage, Error *err);
/// Subscribes to event broadcasts
///
/// @param channel_id The channel id(passed automatically by the dispatcher)
/// @param event The event type string
void vim_subscribe(uint64_t channel_id, String event);
/// Unsubscribes to event broadcasts
///
/// @param channel_id The channel id(passed automatically by the dispatcher)
/// @param event The event type string
void vim_unsubscribe(uint64_t channel_id, String event);
#endif // NVIM_API_VIM_H

View File

@@ -13069,7 +13069,7 @@ static void f_send_event(typval_T *argvars, typval_T *rettv)
return;
}
if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number <= 0) {
if (argvars[0].v_type != VAR_NUMBER || argvars[0].vval.v_number < 0) {
EMSG2(_(e_invarg2), "Channel id must be a positive integer");
return;
}

91
src/nvim/lib/kvec.h Normal file
View File

@@ -0,0 +1,91 @@
/* The MIT License
Copyright (c) 2008, by Attractive Chaos <attractor@live.co.uk>
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
An example:
#include "kvec.h"
int main() {
kvec_t(int) array;
kv_init(array);
kv_push(int, array, 10); // append
kv_a(int, array, 20) = 5; // dynamic
kv_A(array, 20) = 4; // static
kv_destroy(array);
return 0;
}
*/
/*
2008-09-22 (0.1.0):
* The initial version.
*/
#ifndef AC_KVEC_H
#define AC_KVEC_H
#include <stdlib.h>
#include "nvim/memory.h"
#define kv_roundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
#define kvec_t(type) struct { size_t n, m; type *a; }
#define kv_init(v) ((v).n = (v).m = 0, (v).a = 0)
#define kv_destroy(v) free((v).a)
#define kv_A(v, i) ((v).a[(i)])
#define kv_pop(v) ((v).a[--(v).n])
#define kv_size(v) ((v).n)
#define kv_max(v) ((v).m)
#define kv_resize(type, v, s) ((v).m = (s), (v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m))
#define kv_copy(type, v1, v0) do { \
if ((v1).m < (v0).n) kv_resize(type, v1, (v0).n); \
(v1).n = (v0).n; \
memcpy((v1).a, (v0).a, sizeof(type) * (v0).n); \
} while (0) \
#define kv_push(type, v, x) do { \
if ((v).n == (v).m) { \
(v).m = (v).m? (v).m<<1 : 2; \
(v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m); \
} \
(v).a[(v).n++] = (x); \
} while (0)
#define kv_pushp(type, v) (((v).n == (v).m)? \
((v).m = ((v).m? (v).m<<1 : 2), \
(v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \
: 0), ((v).a + ((v).n++))
#define kv_a(type, v, i) (((v).m <= (size_t)(i)? \
((v).m = (v).n = (i) + 1, kv_roundup32((v).m), \
(v).a = (type*)xrealloc((v).a, sizeof(type) * (v).m), 0) \
: (v).n <= (size_t)(i)? (v).n = (i) + 1 \
: 0), (v).a[(i)])
#endif

View File

@@ -15,9 +15,11 @@
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/map.h"
#include "nvim/lib/kvec.h"
typedef struct {
uint64_t id;
Map(cstr_t) *subscribed_events;
bool is_job;
msgpack_unpacker *unpacker;
msgpack_sbuffer *sbuffer;
@@ -33,17 +35,24 @@ typedef struct {
static uint64_t next_id = 1;
static Map(uint64_t) *channels = NULL;
static Map(cstr_t) *event_strings = NULL;
static msgpack_sbuffer msgpack_event_buffer;
static void on_job_stdout(RStream *rstream, void *data, bool eof);
static void on_job_stderr(RStream *rstream, void *data, bool eof);
static void job_out(RStream *rstream, void *data, bool eof);
static void job_err(RStream *rstream, void *data, bool eof);
static void parse_msgpack(RStream *rstream, void *data, bool eof);
static void send_event(Channel *channel, char *type, typval_T *data);
static void broadcast_event(char *type, typval_T *data);
static void unsubscribe(Channel *channel, char *event);
static void close_channel(Channel *channel);
static void close_cb(uv_handle_t *handle);
static WBuffer *serialize_event(char *type, typval_T *data);
static Channel *register_channel(void);
void channel_init()
{
channels = map_new(uint64_t)();
event_strings = map_new(cstr_t)();
msgpack_sbuffer_init(&msgpack_event_buffer);
}
@@ -62,71 +71,78 @@ void channel_teardown()
void channel_from_job(char **argv)
{
Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = on_job_stdout;
channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->sbuffer = msgpack_sbuffer_new();
channel->id = next_id++;
Channel *channel = register_channel();
channel->is_job = true;
channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
map_put(uint64_t)(channels, channel->id, channel);
channel->data.job_id = job_start(argv, channel, job_out, job_err, NULL);
}
void channel_from_stream(uv_stream_t *stream)
{
Channel *channel = xmalloc(sizeof(Channel));
rstream_cb rcb = parse_msgpack;
channel->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
channel->sbuffer = msgpack_sbuffer_new();
Channel *channel = register_channel();
stream->data = NULL;
channel->id = next_id++;
channel->is_job = false;
// read stream
channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
channel->data.streams.write = wstream_new(1024 * 1024);
wstream_set_stream(channel->data.streams.write, stream);
channel->data.streams.uv = stream;
map_put(uint64_t)(channels, channel->id, channel);
}
bool channel_send_event(uint64_t id, char *type, typval_T *data)
{
Channel *channel = map_get(uint64_t)(channels, id);
Channel *channel = NULL;
if (!channel) {
return false;
if (id > 0) {
if (!(channel = map_get(uint64_t)(channels, id))) {
return false;
}
send_event(channel, type, data);
} else {
broadcast_event(type, data);
}
String event_type = {.size = strnlen(type, 1024), .data = type};
Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(event_type, event_data, &packer);
char *bytes = xmemdup(msgpack_event_buffer.data, msgpack_event_buffer.size);
wstream_write(channel->data.streams.write,
bytes,
msgpack_event_buffer.size,
true);
msgpack_rpc_free_object(event_data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return true;
}
static void on_job_stdout(RStream *rstream, void *data, bool eof)
void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = map_get(uint64_t)(channels, id))) {
return;
}
char *event_string = map_get(cstr_t)(event_strings, event);
if (!event_string) {
event_string = xstrdup(event);
map_put(cstr_t)(event_strings, event_string, event_string);
}
map_put(cstr_t)(channel->subscribed_events, event_string, event_string);
}
void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = map_get(uint64_t)(channels, id))) {
return;
}
unsubscribe(channel, event);
}
static void job_out(RStream *rstream, void *data, bool eof)
{
Job *job = data;
parse_msgpack(rstream, job_data(job), eof);
}
static void on_job_stderr(RStream *rstream, void *data, bool eof)
static void job_err(RStream *rstream, void *data, bool eof)
{
// TODO(tarruda): plugin error messages should be sent to the error buffer
}
@@ -158,15 +174,62 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// Perform the call
msgpack_rpc_call(channel->id, &unpacked.data, &response);
wstream_write(channel->data.streams.write,
xmemdup(channel->sbuffer->data, channel->sbuffer->size),
channel->sbuffer->size,
true);
wstream_new_buffer(channel->sbuffer->data,
channel->sbuffer->size,
true));
// Clear the buffer for future calls
msgpack_sbuffer_clear(channel->sbuffer);
}
}
static void send_event(Channel *channel, char *type, typval_T *data)
{
wstream_write(channel->data.streams.write, serialize_event(type, data));
}
static void broadcast_event(char *type, typval_T *data)
{
kvec_t(Channel *) subscribed;
kv_init(subscribed);
Channel *channel;
map_foreach_value(channels, channel, {
if (map_has(cstr_t)(channel->subscribed_events, type)) {
kv_push(Channel *, subscribed, channel);
}
});
if (!kv_size(subscribed)) {
goto end;
}
WBuffer *buffer = serialize_event(type, data);
for (size_t i = 0; i < kv_size(subscribed); i++) {
wstream_write(kv_A(subscribed, i)->data.streams.write, buffer);
}
end:
kv_destroy(subscribed);
}
static void unsubscribe(Channel *channel, char *event)
{
char *event_string = map_get(cstr_t)(event_strings, event);
map_del(cstr_t)(channel->subscribed_events, event_string);
map_foreach_value(channels, channel, {
if (map_has(cstr_t)(channel->subscribed_events, event_string)) {
return;
}
});
// Since the string is no longer used by other channels, release it's memory
map_del(cstr_t)(event_strings, event_string);
free(event_string);
}
static void close_channel(Channel *channel)
{
map_del(uint64_t)(channels, channel->id);
@@ -181,6 +244,13 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
map_free(cstr_t)(channel->subscribed_events);
free(channel);
}
@@ -190,3 +260,29 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
static WBuffer *serialize_event(char *type, typval_T *data)
{
String event_type = {.size = strnlen(type, EVENT_MAXLEN), .data = type};
Object event_data = vim_to_object(data);
msgpack_packer packer;
msgpack_packer_init(&packer, &msgpack_event_buffer, msgpack_sbuffer_write);
msgpack_rpc_notification(event_type, event_data, &packer);
WBuffer *rv = wstream_new_buffer(msgpack_event_buffer.data,
msgpack_event_buffer.size,
true);
msgpack_rpc_free_object(event_data);
msgpack_sbuffer_clear(&msgpack_event_buffer);
return rv;
}
static Channel *register_channel()
{
Channel *rv = xmalloc(sizeof(Channel));
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->sbuffer = msgpack_sbuffer_new();
rv->id = next_id++;
rv->subscribed_events = map_new(cstr_t)();
map_put(uint64_t)(channels, rv->id, rv);
return rv;
}

View File

@@ -5,6 +5,8 @@
#include "nvim/vim.h"
#define EVENT_MAXLEN 512
/// Initializes the module
void channel_init(void);
@@ -25,11 +27,24 @@ void channel_from_job(char **argv);
/// Sends event/data to channel
///
/// @param id The channel id
/// @param id The channel id. If 0, the event will be sent to all
/// channels that have subscribed to the event type
/// @param type The event type, an arbitrary string
/// @param obj The event data
/// @return True if the data was sent successfully, false otherwise.
bool channel_send_event(uint64_t id, char *type, typval_T *data);
/// Subscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
void channel_subscribe(uint64_t id, char *event);
/// Unsubscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
void channel_unsubscribe(uint64_t id, char *event);
#endif // NVIM_OS_CHANNEL_H

View File

@@ -234,7 +234,7 @@ bool job_write(int id, char *data, uint32_t len)
return false;
}
if (!wstream_write(job->in, data, len, true)) {
if (!wstream_write(job->in, wstream_new_buffer(data, len, false))) {
job_stop(job->id);
return false;
}

View File

@@ -33,7 +33,7 @@ static void close_cb(uv_handle_t *handle);
static void emit_read_event(RStream *rstream, bool eof);
RStream * rstream_new(rstream_cb cb,
uint32_t buffer_size,
size_t buffer_size,
void *data,
bool async)
{
@@ -133,7 +133,7 @@ void rstream_stop(RStream *rstream)
}
}
size_t rstream_read(RStream *rstream, char *buf, uint32_t count)
size_t rstream_read(RStream *rstream, char *buf, size_t count)
{
size_t read_count = rstream->wpos - rstream->rpos;

View File

@@ -21,7 +21,7 @@
/// this to false
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
uint32_t buffer_size,
size_t buffer_size,
void *data,
bool async);
@@ -65,7 +65,7 @@ void rstream_stop(RStream *rstream);
/// @param buffer The buffer which will receive the data
/// @param count Number of bytes that `buffer` can accept
/// @return The number of bytes copied into `buffer`
size_t rstream_read(RStream *rstream, char *buffer, uint32_t count);
size_t rstream_read(RStream *rstream, char *buffer, size_t count);
/// Returns the number of bytes available for reading from `rstream`
///

View File

@@ -12,27 +12,27 @@
struct wstream {
uv_stream_t *stream;
// Memory currently used by pending buffers
uint32_t curmem;
size_t curmem;
// Maximum memory used by this instance
uint32_t maxmem;
size_t maxmem;
// Number of pending requests
uint32_t pending_reqs;
size_t pending_reqs;
bool freed;
};
struct wbuffer {
size_t refcount, size;
char *data;
};
typedef struct {
WStream *wstream;
// Buffer containing data to be written
char *buffer;
// Size of the buffer
uint32_t length;
// If it's our responsibility to free the buffer
bool free;
WBuffer *buffer;
} WriteData;
static void write_cb(uv_write_t *req, int status);
WStream * wstream_new(uint32_t maxmem)
WStream * wstream_new(size_t maxmem)
{
WStream *rv = xmalloc(sizeof(WStream));
rv->maxmem = maxmem;
@@ -59,51 +59,60 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream)
wstream->stream = stream;
}
bool wstream_write(WStream *wstream, char *buffer, uint32_t length, bool free)
bool wstream_write(WStream *wstream, WBuffer *buffer)
{
WriteData *data;
uv_buf_t uvbuf;
uv_write_t *req;
if (wstream->freed) {
// Don't accept write requests after the WStream instance was freed
// This should not be called after a wstream was freed
assert(!wstream->freed);
if (wstream->curmem + buffer->size > wstream->maxmem) {
return false;
}
if (wstream->curmem + length > wstream->maxmem) {
return false;
}
if (free) {
// We should only account for buffers that are ours to free
wstream->curmem += length;
}
buffer->refcount++;
wstream->curmem += buffer->size;
data = xmalloc(sizeof(WriteData));
data->wstream = wstream;
data->buffer = buffer;
data->length = length;
data->free = free;
req = xmalloc(sizeof(uv_write_t));
req->data = data;
uvbuf.base = buffer;
uvbuf.len = length;
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
wstream->pending_reqs++;
uv_write(req, wstream->stream, &uvbuf, 1, write_cb);
return true;
}
WBuffer *wstream_new_buffer(char *data, size_t size, bool copy)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
rv->refcount = 0;
if (copy) {
rv->data = xmemdup(data, size);
} else {
rv->data = data;
}
return rv;
}
static void write_cb(uv_write_t *req, int status)
{
WriteData *data = req->data;
free(req);
data->wstream->curmem -= data->buffer->size;
if (data->free) {
if (!--data->buffer->refcount) {
// Free the data written to the stream
free(data->buffer->data);
free(data->buffer);
data->wstream->curmem -= data->length;
}
data->wstream->pending_reqs--;

View File

@@ -12,7 +12,7 @@
///
/// @param maxmem Maximum amount memory used by this `WStream` instance.
/// @return The newly-allocated `WStream` instance
WStream * wstream_new(uint32_t maxmem);
WStream * wstream_new(size_t maxmem);
/// Frees all memory allocated for a WStream instance
///
@@ -31,10 +31,19 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream);
///
/// @param wstream The `WStream` instance
/// @param buffer The buffer which contains data to be written
/// @param length Number of bytes that should be written from `buffer`
/// @param free If true, `buffer` will be freed after the write is complete
/// @return true if the data was successfully queued, false otherwise.
bool wstream_write(WStream *wstream, char *buffer, uint32_t length, bool free);
/// @return false if the write failed
bool wstream_write(WStream *wstream, WBuffer *buffer);
/// Creates a WBuffer object for holding output data. Instances of this
/// object can be reused across WStream instances, and the memory is freed
/// automatically when no longer needed(it tracks the number of references
/// internally)
///
/// @param data Data stored by the WBuffer
/// @param size The size of the data array
/// @param copy If true, the data will be copied into the WBuffer
/// @return The allocated WBuffer instance
WBuffer *wstream_new_buffer(char *data, size_t size, bool copy);
#endif // NVIM_OS_WSTREAM_H

View File

@@ -1,6 +1,7 @@
#ifndef NVIM_OS_WSTREAM_DEFS_H
#define NVIM_OS_WSTREAM_DEFS_H
typedef struct wbuffer WBuffer;
typedef struct wstream WStream;
#endif // NVIM_OS_WSTREAM_DEFS_H