queue: Implement a more flexible event queue

This commit is contained in:
Thiago de Arruda
2015-07-24 12:24:25 -03:00
parent 1a7a020b68
commit a6e0d35d2d
8 changed files with 406 additions and 1 deletions

View File

@@ -217,7 +217,7 @@ install_helper(TARGETS nvim)
if(CLANG_ASAN_UBSAN)
message(STATUS "Enabling Clang address sanitizer and undefined behavior sanitizer for nvim.")
set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-DEXITFREE ")
set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined ")
set_property(TARGET nvim APPEND_STRING PROPERTY COMPILE_FLAGS "-fno-sanitize-recover -fno-omit-frame-pointer -fno-optimize-sibling-calls -fsanitize=address -fsanitize=undefined -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/.asan-blacklist")
set_property(TARGET nvim APPEND_STRING PROPERTY LINK_FLAGS "-fsanitize=address -fsanitize=undefined ")
elseif(CLANG_MSAN)
message(STATUS "Enabling Clang memory sanitizer for nvim.")

39
src/nvim/event/defs.h Normal file
View File

@@ -0,0 +1,39 @@
#ifndef NVIM_EVENT_DEFS_H
#define NVIM_EVENT_DEFS_H
#include <assert.h>
#include <stdarg.h>
#define EVENT_HANDLER_MAX_ARGC 4
typedef void (*argv_callback)(void **argv);
typedef struct message {
int priority;
argv_callback handler;
void *argv[EVENT_HANDLER_MAX_ARGC];
} Event;
#define VA_EVENT_INIT(event, p, h, a) \
do { \
assert(a <= EVENT_HANDLER_MAX_ARGC); \
(event)->priority = p; \
(event)->handler = h; \
if (a) { \
va_list args; \
va_start(args, a); \
for (int i = 0; i < a; i++) { \
(event)->argv[i] = va_arg(args, void *); \
} \
va_end(args); \
} \
} while (0)
static inline Event event_create(int priority, argv_callback cb, int argc, ...)
{
assert(argc <= EVENT_HANDLER_MAX_ARGC);
Event event;
VA_EVENT_INIT(&event, priority, cb, argc);
return event;
}
#endif // NVIM_EVENT_DEFS_H

202
src/nvim/event/queue.c Normal file
View File

@@ -0,0 +1,202 @@
// Queue for selective async event processing. Instances of this queue support a
// parent/child relationship with the following properties:
//
// - pushing a node to a child queue will push a corresponding link node to the
// parent queue
// - removing a link node from a parent queue will remove the next node
// in the linked child queue
// - removing a node from a child queue will remove the corresponding link node
// in the parent queue
//
// These properties allow neovim to organize and process events from different
// sources with a certain degree of control. Here's how the queue is used:
//
// +----------------+
// | Main loop |
// +----------------+
// ^
// |
// +----------------+
// +-------------->| Event loop |<------------+
// | +--+-------------+ |
// | ^ ^ |
// | | | |
// +-----------+ +-----------+ +---------+ +---------+
// | Channel 1 | | Channel 2 | | Job 1 | | Job 2 |
// +-----------+ +-----------+ +---------+ +---------+
//
//
// In the above diagram, the lower boxes represents event emitters, each with
// it's own private queue that have the event loop queue as the parent.
//
// When idle, the main loop spins the event loop which queues events from many
// sources(channels, jobs, user...). Each event emitter pushes events to its own
// private queue which is propagated to the event loop queue. When the main loop
// consumes an event, the corresponding event is removed from the emitter's
// queue.
//
// The main reason for this queue hierarchy is to allow focusing on a single
// event emitter while blocking the main loop. For example, if the `jobwait`
// vimscript function is called on job1, the main loop will temporarily stop
// polling the event loop queue and poll job1 queue instead. Same with channels,
// when calling `rpcrequest`, we want to temporarily stop processing events from
// other sources and focus on a specific channel.
#include <assert.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <uv.h>
#include "nvim/event/queue.h"
#include "nvim/memory.h"
#include "nvim/os/time.h"
typedef struct queue_item QueueItem;
struct queue_item {
union {
Queue *queue;
struct {
Event event;
QueueItem *parent;
} item;
} data;
bool link; // this is just a link to a node in a child queue
QUEUE node;
};
struct queue {
Queue *parent;
QUEUE headtail;
put_callback put_cb;
void *data;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/queue.c.generated.h"
#endif
static Event NILEVENT = {.handler = NULL, .argv = {NULL}};
Queue *queue_new_parent(put_callback put_cb, void *data)
{
return queue_new(NULL, put_cb, data);
}
Queue *queue_new_child(Queue *parent)
FUNC_ATTR_NONNULL_ALL
{
assert(!parent->parent);
return queue_new(parent, NULL, NULL);
}
static Queue *queue_new(Queue *parent, put_callback put_cb, void *data)
{
Queue *rv = xmalloc(sizeof(Queue));
QUEUE_INIT(&rv->headtail);
rv->parent = parent;
rv->put_cb = put_cb;
rv->data = data;
return rv;
}
void queue_free(Queue *queue)
{
assert(queue);
if (queue->parent) {
while (!QUEUE_EMPTY(&queue->headtail)) {
QUEUE *q = QUEUE_HEAD(&queue->headtail);
QueueItem *item = queue_node_data(q);
assert(!item->link);
QUEUE_REMOVE(&item->data.item.parent->node);
xfree(item->data.item.parent);
QUEUE_REMOVE(q);
xfree(item);
}
}
xfree(queue);
}
Event queue_get(Queue *queue)
{
return queue_empty(queue) ? NILEVENT : queue_remove(queue);
}
void queue_put_event(Queue *queue, Event event)
{
assert(queue);
assert(queue->parent); // don't push directly to the parent queue
queue_push(queue, event);
if (queue->parent->put_cb) {
queue->parent->put_cb(queue->parent, queue->parent->data);
}
}
void queue_process_events(Queue *queue)
{
assert(queue);
while (!queue_empty(queue)) {
Event event = queue_get(queue);
if (event.handler) {
event.handler(event.argv);
}
}
}
bool queue_empty(Queue *queue)
{
assert(queue);
return QUEUE_EMPTY(&queue->headtail);
}
static Event queue_remove(Queue *queue)
{
assert(!queue_empty(queue));
QUEUE *h = QUEUE_HEAD(&queue->headtail);
QUEUE_REMOVE(h);
QueueItem *item = queue_node_data(h);
Event rv;
if (item->link) {
assert(!queue->parent);
// remove the next node in the linked queue
Queue *linked = item->data.queue;
assert(!queue_empty(linked));
QueueItem *child =
queue_node_data(QUEUE_HEAD(&linked->headtail));
QUEUE_REMOVE(&child->node);
rv = child->data.item.event;
xfree(child);
} else {
assert(queue->parent);
assert(!queue_empty(queue->parent));
// remove the corresponding link node in the parent queue
QUEUE_REMOVE(&item->data.item.parent->node);
xfree(item->data.item.parent);
rv = item->data.item.event;
}
xfree(item);
return rv;
}
static void queue_push(Queue *queue, Event event)
{
QueueItem *item = xmalloc(sizeof(QueueItem));
item->link = false;
item->data.item.event = event;
QUEUE_INSERT_TAIL(&queue->headtail, &item->node);
// push link node to the parent queue
item->data.item.parent = xmalloc(sizeof(QueueItem));
item->data.item.parent->link = true;
item->data.item.parent->data.queue = queue;
QUEUE_INSERT_TAIL(&queue->parent->headtail, &item->data.item.parent->node);
}
static QueueItem *queue_node_data(QUEUE *q)
{
return QUEUE_DATA(q, QueueItem, node);
}

19
src/nvim/event/queue.h Normal file
View File

@@ -0,0 +1,19 @@
#ifndef NVIM_EVENT_QUEUE_H
#define NVIM_EVENT_QUEUE_H
#include <uv.h>
#include "nvim/event/defs.h"
#include "nvim/lib/queue.h"
typedef struct queue Queue;
typedef void (*put_callback)(Queue *queue, void *data);
#define queue_put(q, h, ...) \
queue_put_event(q, event_create(1, h, __VA_ARGS__));
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/queue.h.generated.h"
#endif
#endif // NVIM_EVENT_QUEUE_H