channel: avoid references to non-rooted vimL list with output

likely fixes #7768 #7913

If multiple internal stream callbacks were recieved before vimL
callbacks got called, only invoke one vimL callback with all data.
This commit is contained in:
Björn Linse
2018-05-13 15:07:10 +02:00
parent e121b1dbe7
commit a676c658cc
2 changed files with 44 additions and 31 deletions

View File

@@ -25,7 +25,8 @@ typedef struct {
Channel *chan; Channel *chan;
Callback *callback; Callback *callback;
const char *type; const char *type;
list_T *received; // if reader is set, status is ignored.
CallbackReader *reader;
int status; int status;
} ChannelEvent; } ChannelEvent;
@@ -253,17 +254,12 @@ void channel_decref(Channel *chan)
void callback_reader_free(CallbackReader *reader) void callback_reader_free(CallbackReader *reader)
{ {
callback_free(&reader->cb); callback_free(&reader->cb);
if (reader->buffered) { ga_clear(&reader->buffer);
ga_clear(&reader->buffer);
}
} }
void callback_reader_start(CallbackReader *reader) void callback_reader_start(CallbackReader *reader)
{ {
if (reader->buffered) { ga_init(&reader->buffer, sizeof(char *), 32);
ga_init(&reader->buffer, sizeof(char *), 32);
ga_grow(&reader->buffer, 32);
}
} }
static void free_channel_event(void **argv) static void free_channel_event(void **argv)
@@ -533,29 +529,27 @@ err:
/// ///
/// @return [allocated] Converted list. /// @return [allocated] Converted list.
static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE
{ {
list_T *const l = tv_list_alloc(kListLenMayKnow); list_T *const l = tv_list_alloc(kListLenMayKnow);
// Empty buffer should be represented by [''], encode_list_write() thinks // Empty buffer should be represented by [''], encode_list_write() thinks
// empty list is fine for the case. // empty list is fine for the case.
tv_list_append_string(l, "", 0); tv_list_append_string(l, "", 0);
encode_list_write(l, buf, len); if (len > 0) {
encode_list_write(l, buf, len);
}
return l; return l;
} }
// vimscript job callbacks must be executed on Nvim main loop // vimscript job callbacks must be executed on Nvim main loop
static inline void process_channel_event(Channel *chan, Callback *callback, static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type, char *buf, const char *type,
size_t count, int status) CallbackReader *reader, int status)
{ {
assert(callback); assert(callback);
ChannelEvent *event_data = xmalloc(sizeof(*event_data)); ChannelEvent *event_data = xmalloc(sizeof(*event_data));
event_data->received = NULL; event_data->reader = reader;
if (buf) { event_data->status = status;
event_data->received = buffer_to_tv_list(buf, count);
} else {
event_data->status = status;
}
channel_incref(chan); // Hold on ref to callback channel_incref(chan); // Hold on ref to callback
event_data->chan = chan; event_data->chan = chan;
event_data->callback = callback; event_data->callback = callback;
@@ -605,8 +599,7 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
if (eof) { if (eof) {
if (reader->buffered) { if (reader->buffered) {
if (reader->cb.type != kCallbackNone) { if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data, process_channel_event(chan, &reader->cb, type, reader, 0);
(size_t)reader->buffer.ga_len, 0);
} else if (reader->self) { } else if (reader->self) {
if (tv_dict_find(reader->self, type, -1) == NULL) { if (tv_dict_find(reader->self, type, -1) == NULL) {
list_T *data = buffer_to_tv_list(reader->buffer.ga_data, list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
@@ -617,12 +610,12 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
channel_incref(chan); channel_incref(chan);
multiqueue_put(chan->events, on_buffered_error, 2, chan, type); multiqueue_put(chan->events, on_buffered_error, 2, chan, type);
} }
ga_clear(&reader->buffer);
} else { } else {
abort(); abort();
} }
ga_clear(&reader->buffer);
} else if (reader->cb.type != kCallbackNone) { } else if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, ptr, 0, 0); process_channel_event(chan, &reader->cb, type, reader, 0);
} }
return; return;
} }
@@ -634,10 +627,12 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
} }
rbuffer_consumed(buf, count); rbuffer_consumed(buf, count);
if (reader->buffered) { // if buffer wasn't consumed, a pending callback is stalled. Aggregate the
ga_concat_len(&reader->buffer, ptr, count); // received data and avoid a "burst" of multiple callbacks.
} else if (callback_reader_set(*reader)) { bool buffer_set = reader->buffer.ga_len > 0;
process_channel_event(chan, &reader->cb, type, ptr, count, 0); ga_concat_len(&reader->buffer, ptr, count);
if (!reader->buffered && !buffer_set && callback_reader_set(*reader)) {
process_channel_event(chan, &reader->cb, type, reader, 0);
} }
} }
@@ -661,7 +656,7 @@ static void channel_process_exit_cb(Process *proc, int status, void *data)
// If process did not exit, we only closed the handle of a detached process. // If process did not exit, we only closed the handle of a detached process.
bool exited = (status >= 0); bool exited = (status >= 0);
if (exited) { if (exited) {
process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status); process_channel_event(chan, &chan->on_exit, "exit", NULL, status);
} }
channel_decref(chan); channel_decref(chan);
@@ -677,11 +672,13 @@ static void on_channel_event(void **args)
argv[0].v_lock = VAR_UNLOCKED; argv[0].v_lock = VAR_UNLOCKED;
argv[0].vval.v_number = (varnumber_T)ev->chan->id; argv[0].vval.v_number = (varnumber_T)ev->chan->id;
if (ev->received) { if (ev->reader) {
argv[1].v_type = VAR_LIST; argv[1].v_type = VAR_LIST;
argv[1].v_lock = VAR_UNLOCKED; argv[1].v_lock = VAR_UNLOCKED;
argv[1].vval.v_list = ev->received; argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data,
(size_t)ev->reader->buffer.ga_len);
tv_list_ref(argv[1].vval.v_list); tv_list_ref(argv[1].vval.v_list);
ga_clear(&ev->reader->buffer);
} else { } else {
argv[1].v_type = VAR_NUMBER; argv[1].v_type = VAR_NUMBER;
argv[1].v_lock = VAR_UNLOCKED; argv[1].v_lock = VAR_UNLOCKED;

View File

@@ -433,9 +433,25 @@ describe('jobs', function()
let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done'] let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done']
endif endif
call jobwait([jobstart(cmd, d)]) call jobwait([jobstart(cmd, d)])
call rpcnotify(g:channel, 'data', d.data)
]]) ]])
eq({'notification', 'data', {{{'1', ''}, {'2', ''}, {'3', ''}, {'4', ''}, {'5', ''}, {''}}}}, next_msg())
local expected = {'1', '2', '3', '4', '5', ''}
local chunks = eval('d.data')
local received = {''}
for i, chunk in ipairs(chunks) do
if i < #chunks then
-- if chunks got joined, a spurious [''] callback was not sent
neq({''}, chunk)
else
-- but EOF callback is still sent
eq({''}, chunk)
end
received[#received] = received[#received]..chunk[1]
for j = 2, #chunk do
received[#received+1] = chunk[j]
end
end
eq(expected, received)
end) end)
it('jobstart() works with partial functions', function() it('jobstart() works with partial functions', function()