channels: improvements to buffering

This commit is contained in:
Björn Linse
2017-09-17 16:23:39 +02:00
parent fee367a74f
commit a97cdff14d
3 changed files with 70 additions and 29 deletions

View File

@@ -257,7 +257,8 @@ void callback_reader_free(CallbackReader *reader)
void callback_reader_start(CallbackReader *reader)
{
if (reader->buffered) {
ga_init(&reader->buffer, sizeof(char *), 1);
ga_init(&reader->buffer, sizeof(char *), 32);
ga_grow(&reader->buffer, 32);
}
}
@@ -521,6 +522,34 @@ err:
return 0;
}
/// NB: mutates buf in place!
static list_T *buffer_to_tv_list(char *buf, size_t count)
{
list_T *ret = tv_list_alloc();
char *ptr = buf;
size_t remaining = count;
size_t off = 0;
while (off < remaining) {
// append the line
if (ptr[off] == NL) {
tv_list_append_string(ret, ptr, (ssize_t)off);
size_t skip = off + 1;
ptr += skip;
remaining -= skip;
off = 0;
continue;
}
if (ptr[off] == NUL) {
// Translate NUL to NL
ptr[off] = NL;
}
off++;
}
tv_list_append_string(ret, ptr, (ssize_t)off);
return ret;
}
// vimscript job callbacks must be executed on Nvim main loop
static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type, char *buf,
@@ -530,28 +559,7 @@ static inline void process_channel_event(Channel *chan, Callback *callback,
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
event_data->received = NULL;
if (buf) {
event_data->received = tv_list_alloc();
char *ptr = buf;
size_t remaining = count;
size_t off = 0;
while (off < remaining) {
// append the line
if (ptr[off] == NL) {
tv_list_append_string(event_data->received, ptr, (ssize_t)off);
size_t skip = off + 1;
ptr += skip;
remaining -= skip;
off = 0;
continue;
}
if (ptr[off] == NUL) {
// Translate NUL to NL
ptr[off] = NL;
}
off++;
}
tv_list_append_string(event_data->received, ptr, (ssize_t)off);
event_data->received = buffer_to_tv_list(buf, count);
} else {
event_data->status = status;
}
@@ -602,10 +610,18 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
if (eof) {
if (reader->buffered) {
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
(size_t)reader->buffer.ga_len, 0);
ga_clear(&reader->buffer);
} else if (callback_reader_set(*reader)) {
if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
(size_t)reader->buffer.ga_len, 0);
ga_clear(&reader->buffer);
} else if (reader->self) {
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
(size_t)reader->buffer.ga_len);
tv_dict_add_list(reader->self, type, strlen(type), data);
} else {
abort();
}
} else if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
}
return;