tui: "backpressure": Drop messages to avoid flooding.

Closes #1234

multiqueue:
- Implement multiqueue_size()
- Rename MultiQueueItem.parent to MultiQueueItem.parent_item, to avoid confusion
  with MultiQueue.parent.
This commit is contained in:
Justin M. Keyes
2016-10-03 10:46:11 +02:00
parent 5082af415f
commit 043f85210a
11 changed files with 130 additions and 42 deletions

View File

@@ -92,6 +92,22 @@ void loop_close(Loop *loop, bool wait)
kl_destroy(WatcherPtr, loop->children); kl_destroy(WatcherPtr, loop->children);
} }
void loop_purge(Loop *loop)
{
uv_mutex_lock(&loop->mutex);
multiqueue_purge_events(loop->thread_events);
multiqueue_purge_events(loop->fast_events);
uv_mutex_unlock(&loop->mutex);
}
size_t loop_size(Loop *loop)
{
uv_mutex_lock(&loop->mutex);
size_t rv = multiqueue_size(loop->thread_events);
uv_mutex_unlock(&loop->mutex);
return rv;
}
static void async_cb(uv_async_t *handle) static void async_cb(uv_async_t *handle)
{ {
Loop *l = handle->loop->data; Loop *l = handle->loop->data;

View File

@@ -1,6 +1,7 @@
// Multi-level queue for selective async event processing. Multiqueue supports // Multi-level queue for selective async event processing.
// a parent-child relationship with the following properties: // Not threadsafe; access must be synchronized externally.
// //
// Multiqueue supports a parent-child relationship with these properties:
// - pushing a node to a child queue will push a corresponding link node to the // - pushing a node to a child queue will push a corresponding link node to the
// parent queue // parent queue
// - removing a link node from a parent queue will remove the next node // - removing a link node from a parent queue will remove the next node
@@ -14,8 +15,7 @@
// +----------------+ // +----------------+
// | Main loop | // | Main loop |
// +----------------+ // +----------------+
// ^ //
// |
// +----------------+ // +----------------+
// +-------------->| Event loop |<------------+ // +-------------->| Event loop |<------------+
// | +--+-------------+ | // | +--+-------------+ |
@@ -60,7 +60,7 @@ struct multiqueue_item {
MultiQueue *queue; MultiQueue *queue;
struct { struct {
Event event; Event event;
MultiQueueItem *parent; MultiQueueItem *parent_item;
} item; } item;
} data; } data;
bool link; // true: current item is just a link to a node in a child queue bool link; // true: current item is just a link to a node in a child queue
@@ -69,9 +69,10 @@ struct multiqueue_item {
struct multiqueue { struct multiqueue {
MultiQueue *parent; MultiQueue *parent;
QUEUE headtail; QUEUE headtail; // circularly-linked
put_callback put_cb; put_callback put_cb;
void *data; void *data;
size_t size;
}; };
#ifdef INCLUDE_GENERATED_DECLARATIONS #ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -88,7 +89,8 @@ MultiQueue *multiqueue_new_parent(put_callback put_cb, void *data)
MultiQueue *multiqueue_new_child(MultiQueue *parent) MultiQueue *multiqueue_new_child(MultiQueue *parent)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_NONNULL_ALL
{ {
assert(!parent->parent); assert(!parent->parent); // parent cannot have a parent, more like a "root"
parent->size++;
return multiqueue_new(parent, NULL, NULL); return multiqueue_new(parent, NULL, NULL);
} }
@@ -97,6 +99,7 @@ static MultiQueue *multiqueue_new(MultiQueue *parent, put_callback put_cb,
{ {
MultiQueue *rv = xmalloc(sizeof(MultiQueue)); MultiQueue *rv = xmalloc(sizeof(MultiQueue));
QUEUE_INIT(&rv->headtail); QUEUE_INIT(&rv->headtail);
rv->size = 0;
rv->parent = parent; rv->parent = parent;
rv->put_cb = put_cb; rv->put_cb = put_cb;
rv->data = data; rv->data = data;
@@ -110,8 +113,8 @@ void multiqueue_free(MultiQueue *this)
QUEUE *q = QUEUE_HEAD(&this->headtail); QUEUE *q = QUEUE_HEAD(&this->headtail);
MultiQueueItem *item = multiqueue_node_data(q); MultiQueueItem *item = multiqueue_node_data(q);
if (this->parent) { if (this->parent) {
QUEUE_REMOVE(&item->data.item.parent->node); QUEUE_REMOVE(&item->data.item.parent_item->node);
xfree(item->data.item.parent); xfree(item->data.item.parent_item);
} }
QUEUE_REMOVE(q); QUEUE_REMOVE(q);
xfree(item); xfree(item);
@@ -145,6 +148,15 @@ void multiqueue_process_events(MultiQueue *this)
} }
} }
/// Removes all events without processing them.
void multiqueue_purge_events(MultiQueue *this)
{
assert(this);
while (!multiqueue_empty(this)) {
(void)multiqueue_remove(this);
}
}
bool multiqueue_empty(MultiQueue *this) bool multiqueue_empty(MultiQueue *this)
{ {
assert(this); assert(this);
@@ -157,6 +169,12 @@ void multiqueue_replace_parent(MultiQueue *this, MultiQueue *new_parent)
this->parent = new_parent; this->parent = new_parent;
} }
/// Gets the count of all events currently in the queue.
size_t multiqueue_size(MultiQueue *this)
{
return this->size;
}
static Event multiqueue_remove(MultiQueue *this) static Event multiqueue_remove(MultiQueue *this)
{ {
assert(!multiqueue_empty(this)); assert(!multiqueue_empty(this));
@@ -178,12 +196,13 @@ static Event multiqueue_remove(MultiQueue *this)
} else { } else {
if (this->parent) { if (this->parent) {
// remove the corresponding link node in the parent queue // remove the corresponding link node in the parent queue
QUEUE_REMOVE(&item->data.item.parent->node); QUEUE_REMOVE(&item->data.item.parent_item->node);
xfree(item->data.item.parent); xfree(item->data.item.parent_item);
} }
rv = item->data.item.event; rv = item->data.item.event;
} }
this->size--;
xfree(item); xfree(item);
return rv; return rv;
} }
@@ -196,11 +215,13 @@ static void multiqueue_push(MultiQueue *this, Event event)
QUEUE_INSERT_TAIL(&this->headtail, &item->node); QUEUE_INSERT_TAIL(&this->headtail, &item->node);
if (this->parent) { if (this->parent) {
// push link node to the parent queue // push link node to the parent queue
item->data.item.parent = xmalloc(sizeof(MultiQueueItem)); item->data.item.parent_item = xmalloc(sizeof(MultiQueueItem));
item->data.item.parent->link = true; item->data.item.parent_item->link = true;
item->data.item.parent->data.queue = this; item->data.item.parent_item->data.queue = this;
QUEUE_INSERT_TAIL(&this->parent->headtail, &item->data.item.parent->node); QUEUE_INSERT_TAIL(&this->parent->headtail,
&item->data.item.parent_item->node);
} }
this->size++;
} }
static MultiQueueItem *multiqueue_node_data(QUEUE *q) static MultiQueueItem *multiqueue_node_data(QUEUE *q)

View File

@@ -1,3 +1,8 @@
// Queue implemented by circularly-linked list.
//
// Adapted from libuv. Simpler and more efficient than klist.h for implementing
// queues that support arbitrary insertion/removal.
//
// Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl> // Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
// //
// Permission to use, copy, modify, and/or distribute this software for any // Permission to use, copy, modify, and/or distribute this software for any
@@ -28,6 +33,8 @@ typedef struct _queue {
#define QUEUE_DATA(ptr, type, field) \ #define QUEUE_DATA(ptr, type, field) \
((type *)((char *)(ptr) - offsetof(type, field))) ((type *)((char *)(ptr) - offsetof(type, field)))
// Important note: mutating the list while QUEUE_FOREACH is
// iterating over its elements results in undefined behavior.
#define QUEUE_FOREACH(q, h) \ #define QUEUE_FOREACH(q, h) \
for ( /* NOLINT(readability/braces) */ \ for ( /* NOLINT(readability/braces) */ \
(q) = (h)->next; (q) != (h); (q) = (q)->next) (q) = (h)->next; (q) != (h); (q) = (q)->next)
@@ -56,17 +63,6 @@ static inline void QUEUE_ADD(QUEUE *const h, QUEUE *const n)
h->prev->next = h; h->prev->next = h;
} }
static inline void QUEUE_SPLIT(QUEUE *const h, QUEUE *const q, QUEUE *const n)
FUNC_ATTR_ALWAYS_INLINE
{
n->prev = h->prev;
n->prev->next = n;
n->next = q;
h->prev = q->prev;
h->prev->next = h;
q->prev = n;
}
static inline void QUEUE_INSERT_HEAD(QUEUE *const h, QUEUE *const q) static inline void QUEUE_INSERT_HEAD(QUEUE *const h, QUEUE *const q)
FUNC_ATTR_ALWAYS_INLINE FUNC_ATTR_ALWAYS_INLINE
{ {

View File

@@ -158,4 +158,7 @@
#define RGB(r, g, b) ((r << 16) | (g << 8) | b) #define RGB(r, g, b) ((r << 16) | (g << 8) | b)
#define STR_(x) #x
#define STR(x) STR_(x)
#endif // NVIM_MACROS_H #endif // NVIM_MACROS_H

View File

@@ -319,8 +319,6 @@ static bool handle_forced_escape(TermInput *input)
return false; return false;
} }
static void restart_reading(void **argv);
static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
bool eof) bool eof)
{ {

View File

@@ -11,6 +11,7 @@
#include "nvim/lib/kvec.h" #include "nvim/lib/kvec.h"
#include "nvim/vim.h" #include "nvim/vim.h"
#include "nvim/log.h"
#include "nvim/ui.h" #include "nvim/ui.h"
#include "nvim/map.h" #include "nvim/map.h"
#include "nvim/main.h" #include "nvim/main.h"
@@ -32,6 +33,8 @@
#define CNORM_COMMAND_MAX_SIZE 32 #define CNORM_COMMAND_MAX_SIZE 32
#define OUTBUF_SIZE 0xffff #define OUTBUF_SIZE 0xffff
#define TOO_MANY_EVENTS 1000000
typedef struct { typedef struct {
int top, bot, left, right; int top, bot, left, right;
} Rect; } Rect;
@@ -591,6 +594,18 @@ static void tui_flush(UI *ui)
TUIData *data = ui->data; TUIData *data = ui->data;
UGrid *grid = &data->grid; UGrid *grid = &data->grid;
size_t nrevents = loop_size(data->loop);
if (nrevents > TOO_MANY_EVENTS) {
ILOG("TUI event-queue flooded (thread_events=%zu); purging", nrevents);
// Back-pressure: UI events may accumulate much faster than the terminal
// device can serve them. Even if SIGINT/CTRL-C is received, user must still
// wait for the TUI event-queue to drain, and if there are ~millions of
// events in the queue, it could take hours. Clearing the queue allows the
// UI to recover. #1234 #5396
loop_purge(data->loop);
tui_busy_stop(ui); // avoid hidden cursor
}
while (kv_size(data->invalid_regions)) { while (kv_size(data->invalid_regions)) {
Rect r = kv_pop(data->invalid_regions); Rect r = kv_pop(data->invalid_regions);
int currow = -1; int currow = -1;

View File

@@ -88,18 +88,17 @@ void ui_builtin_start(void)
#ifdef FEAT_TUI #ifdef FEAT_TUI
tui_start(); tui_start();
#else #else
fprintf(stderr, "Neovim was built without a Terminal UI," \ fprintf(stderr, "Nvim headless-mode started.\n");
"press Ctrl+C to exit\n");
size_t len; size_t len;
char **addrs = server_address_list(&len); char **addrs = server_address_list(&len);
if (addrs != NULL) { if (addrs != NULL) {
fprintf(stderr, "currently listening on the following address(es)\n"); fprintf(stderr, "Listening on:\n");
for (size_t i = 0; i < len; i++) { for (size_t i = 0; i < len; i++) {
fprintf(stderr, "\t%s\n", addrs[i]); fprintf(stderr, "\t%s\n", addrs[i]);
} }
xfree(addrs); xfree(addrs);
} }
fprintf(stderr, "Press CTRL+C to exit.\n");
#endif #endif
} }

View File

@@ -1,11 +1,12 @@
// UI wrapper that sends UI requests to the UI thread. // UI wrapper that sends requests to the UI thread.
// Used by the built-in TUI and external libnvim-based UIs. // Used by the built-in TUI and libnvim-based UIs.
#include <assert.h> #include <assert.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <limits.h> #include <limits.h>
#include "nvim/log.h"
#include "nvim/main.h" #include "nvim/main.h"
#include "nvim/vim.h" #include "nvim/vim.h"
#include "nvim/ui.h" #include "nvim/ui.h"
@@ -19,10 +20,30 @@
#define UI(b) (((UIBridgeData *)b)->ui) #define UI(b) (((UIBridgeData *)b)->ui)
// Call a function in the UI thread #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
static size_t uilog_seen = 0;
static argv_callback uilog_event = NULL;
#define UI_CALL(ui, name, argc, ...) \
do { \
if (uilog_event == ui_bridge_##name##_event) { \
uilog_seen++; \
} else { \
if (uilog_seen > 0) { \
DLOG("UI bridge: ...%zu times", uilog_seen); \
} \
DLOG("UI bridge: " STR(name)); \
uilog_seen = 0; \
uilog_event = ui_bridge_##name##_event; \
} \
((UIBridgeData *)ui)->scheduler( \
event_create(1, ui_bridge_##name##_event, argc, __VA_ARGS__), UI(ui)); \
} while (0)
#else
// Schedule a function call on the UI bridge thread.
#define UI_CALL(ui, name, argc, ...) \ #define UI_CALL(ui, name, argc, ...) \
((UIBridgeData *)ui)->scheduler( \ ((UIBridgeData *)ui)->scheduler( \
event_create(1, ui_bridge_##name##_event, argc, __VA_ARGS__), UI(ui)) event_create(1, ui_bridge_##name##_event, argc, __VA_ARGS__), UI(ui))
#endif
#define INT2PTR(i) ((void *)(uintptr_t)i) #define INT2PTR(i) ((void *)(uintptr_t)i)
#define PTR2INT(p) ((int)(uintptr_t)p) #define PTR2INT(p) ((int)(uintptr_t)p)

View File

@@ -1,5 +1,5 @@
// Bridge for communication between a UI thread and nvim core. // Bridge for communication between a UI thread and nvim core.
// Used by the built-in TUI and external libnvim-based UIs. // Used by the built-in TUI and libnvim-based UIs.
#ifndef NVIM_UI_BRIDGE_H #ifndef NVIM_UI_BRIDGE_H
#define NVIM_UI_BRIDGE_H #define NVIM_UI_BRIDGE_H

View File

@@ -13,6 +13,7 @@
#include "nvim/iconv.h" #include "nvim/iconv.h"
#include "nvim/version.h" #include "nvim/version.h"
#include "nvim/charset.h" #include "nvim/charset.h"
#include "nvim/macros.h"
#include "nvim/memline.h" #include "nvim/memline.h"
#include "nvim/memory.h" #include "nvim/memory.h"
#include "nvim/message.h" #include "nvim/message.h"
@@ -22,9 +23,6 @@
// version info generated by the build system // version info generated by the build system
#include "auto/versiondef.h" #include "auto/versiondef.h"
#define STR_(x) #x
#define STR(x) STR_(x)
// for ":version", ":intro", and "nvim --version" // for ":version", ":intro", and "nvim --version"
#ifndef NVIM_VERSION_MEDIUM #ifndef NVIM_VERSION_MEDIUM
#define NVIM_VERSION_MEDIUM STR(NVIM_VERSION_MAJOR) "." STR(NVIM_VERSION_MINOR)\ #define NVIM_VERSION_MEDIUM STR(NVIM_VERSION_MAJOR) "." STR(NVIM_VERSION_MINOR)\

View File

@@ -36,6 +36,27 @@ describe("multiqueue (multi-level event-queue)", function()
put(child3, 'c3i2') put(child3, 'c3i2')
end) end)
it('keeps count of added events', function()
eq(3, multiqueue.multiqueue_size(child1))
eq(4, multiqueue.multiqueue_size(child2))
eq(2, multiqueue.multiqueue_size(child3))
end)
it('keeps count of removed events', function()
multiqueue.multiqueue_get(child1)
eq(2, multiqueue.multiqueue_size(child1))
multiqueue.multiqueue_get(child1)
eq(1, multiqueue.multiqueue_size(child1))
multiqueue.multiqueue_get(child1)
eq(0, multiqueue.multiqueue_size(child1))
put(child1, 'c2ixx')
eq(1, multiqueue.multiqueue_size(child1))
multiqueue.multiqueue_get(child1)
eq(0, multiqueue.multiqueue_size(child1))
multiqueue.multiqueue_get(child1)
eq(0, multiqueue.multiqueue_size(child1))
end)
it('removing from parent removes from child', function() it('removing from parent removes from child', function()
eq('c1i1', get(parent)) eq('c1i1', get(parent))
eq('c1i2', get(parent)) eq('c1i2', get(parent))