events: Refactor how event deferral is handled

- Remove all *_set_defer methods and the 'defer' flag from rstream/jobs
- Added {signal,rstream,job}_event_source functions. Each return a pointer that
  represent the event source for the object in question(For signals, a static
  pointer is returned)
- Added a 'source' field to the Event struct, which is set to the appropriate
  value by the code that created the event.
- Added a 'sources' parameter to `event_poll`. It should point to a
  NULL-terminated array of event sources that will be used to decide which
  events should be processed immediately
- Added a 'source_override' parameter to `rstream_new`. This was required to use
  jobs as event sources of RStream instances(When "focusing" on a job, for
  example).
- Extracted `process_from` static function from `event_process`.
- Remove 'defer' parameter from `event_process`, which now operates only on
  deferred events.
- Refactor `channel_send_call` to use the new lock mechanism

What changed in a single sentence: Code that calls `event_poll` have to specify
which event sources should NOT be deferred. This change was necessary for a
number of reasons:

- To fix a bug where due to race conditions, a client request
  could end in the deferred queue in the middle of a `channel_send_call`
  invocation, resulting in a deadlock since the client process would never
  receive a response, and channel_send_call would never return because
  the client would still be waiting for the response.
- To handle "event locking" correctly in recursive `channel_send_call`
  invocations when the frames are waiting for responses from different
  clients. Not much of an issue now since there's only a python client, but
  could break things later.
- To simplify the process of implementing synchronous functions that depend on
  asynchronous events.
This commit is contained in:
Thiago de Arruda
2014-07-08 13:08:29 -03:00
parent cf30837951
commit 2e4ea29d2c
12 changed files with 152 additions and 86 deletions

View File

@@ -944,7 +944,7 @@ doESCkey:
break; break;
case K_EVENT: case K_EVENT:
event_process(true); event_process();
break; break;
case K_HOME: /* <Home> */ case K_HOME: /* <Home> */

View File

@@ -762,7 +762,7 @@ getcmdline (
*/ */
switch (c) { switch (c) {
case K_EVENT: case K_EVENT:
event_process(true); event_process();
// Force a redraw even though the command line didn't change // Force a redraw even though the command line didn't change
shell_resized(); shell_resized();
goto cmdline_not_changed; goto cmdline_not_changed;
@@ -1878,7 +1878,7 @@ redraw:
if (IS_SPECIAL(c1)) { if (IS_SPECIAL(c1)) {
// Process deferred events // Process deferred events
event_process(true); event_process();
// Ignore other special key codes // Ignore other special key codes
continue; continue;
} }

View File

@@ -2473,7 +2473,7 @@ inchar (
char_u dum[DUM_LEN + 1]; char_u dum[DUM_LEN + 1];
for (;; ) { for (;; ) {
event_process(true); event_process();
len = ui_inchar(dum, DUM_LEN, 0L, 0); len = ui_inchar(dum, DUM_LEN, 0L, 0);
if (len == 0 || (len == 1 && dum[0] == 3)) if (len == 0 || (len == 1 && dum[0] == 3))
break; break;

View File

@@ -2074,7 +2074,7 @@ static int do_more_prompt(int typed_char)
toscroll = 0; toscroll = 0;
switch (c) { switch (c) {
case K_EVENT: case K_EVENT:
event_process(true); event_process();
break; break;
case BS: /* scroll one line back */ case BS: /* scroll one line back */
case K_BS: case K_BS:
@@ -2734,8 +2734,11 @@ do_dialog (
retval = 0; retval = 0;
break; break;
default: /* Could be a hotkey? */ default: /* Could be a hotkey? */
if (c < 0) /* special keys are ignored here */ if (c < 0) { /* special keys are ignored here */
// drain event queue to prevent infinite loop
event_process();
continue; continue;
}
if (c == ':' && ex_cmd) { if (c == ':' && ex_cmd) {
retval = dfltbutton; retval = dfltbutton;
ins_char_typebuf(':'); ins_char_typebuf(':');

View File

@@ -7375,5 +7375,5 @@ static void nv_cursorhold(cmdarg_T *cap)
static void nv_event(cmdarg_T *cap) static void nv_event(cmdarg_T *cap)
{ {
event_process(true); event_process();
} }

View File

@@ -118,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream)
stream->data = NULL; stream->data = NULL;
channel->is_job = false; channel->is_job = false;
// read stream // read stream
channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL);
rstream_set_stream(channel->data.streams.read, stream); rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read); rstream_start(channel->data.streams.read);
// write stream // write stream
@@ -189,16 +189,10 @@ bool channel_send_call(uint64_t id,
// Send the msgpack-rpc request // Send the msgpack-rpc request
send_request(channel, request_id, name, arg); send_request(channel, request_id, name, arg);
if (!kv_size(channel->call_stack)) { EventSource channel_source = channel->is_job
// This is the first frame, we must disable event deferral for this ? job_event_source(channel->data.job)
// channel because we won't be returning until the client sends a : rstream_event_source(channel->data.streams.read);
// response EventSource sources[] = {channel_source, NULL};
if (channel->is_job) {
job_set_defer(channel->data.job, false);
} else {
rstream_set_defer(channel->data.streams.read, false);
}
}
// Push the frame // Push the frame
ChannelCallFrame frame = {request_id, false, NIL}; ChannelCallFrame frame = {request_id, false, NIL};
@@ -206,25 +200,19 @@ bool channel_send_call(uint64_t id,
size_t size = kv_size(channel->call_stack); size_t size = kv_size(channel->call_stack);
do { do {
event_poll(-1); event_poll(-1, sources);
} while ( } while (
// Continue running if ... // Continue running if ...
channel->enabled && // the channel is still enabled channel->enabled && // the channel is still enabled
kv_size(channel->call_stack) >= size); // the call didn't return kv_size(channel->call_stack) >= size); // the call didn't return
if (!kv_size(channel->call_stack)) { if (!(kv_size(channel->call_stack)
// Popped last frame, restore event deferral || channel->enabled
if (channel->is_job) { || channel->rpc_call_level)) {
job_set_defer(channel->data.job, true);
} else {
rstream_set_defer(channel->data.streams.read, true);
}
if (!channel->enabled && !channel->rpc_call_level) {
// Close the channel if it has been disabled and we have not been called // Close the channel if it has been disabled and we have not been called
// by `parse_msgpack`(It would be unsafe to close the channel otherwise) // by `parse_msgpack`(It would be unsafe to close the channel otherwise)
close_channel(channel); close_channel(channel);
} }
}
*errored = frame.errored; *errored = frame.errored;
*result = frame.result; *result = frame.result;

View File

@@ -33,6 +33,11 @@ typedef struct {
# include "os/event.c.generated.h" # include "os/event.c.generated.h"
#endif #endif
static klist_t(Event) *deferred_events, *immediate_events; static klist_t(Event) *deferred_events, *immediate_events;
// NULL-terminated array of event sources that we should process immediately.
//
// Events from sources that are not contained in this array are processed
// later when `event_process` is called
static EventSource *immediate_sources = NULL;
void event_init(void) void event_init(void)
{ {
@@ -63,7 +68,8 @@ void event_teardown(void)
} }
// Wait for some event // Wait for some event
bool event_poll(int32_t ms) bool event_poll(int32_t ms, EventSource sources[])
FUNC_ATTR_NONNULL_ARG(2)
{ {
uv_run_mode run_mode = UV_RUN_ONCE; uv_run_mode run_mode = UV_RUN_ONCE;
@@ -99,10 +105,7 @@ bool event_poll(int32_t ms)
do { do {
// Run one event loop iteration, blocking for events if run_mode is // Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE // UV_RUN_ONCE
DLOG("Entering event loop"); processed_events = loop(run_mode, sources);
uv_run(uv_default_loop(), run_mode);
processed_events = event_process(false);
DLOG("Exited event loop, processed %u events", processed_events);
} while ( } while (
// Continue running if ... // Continue running if ...
!processed_events && // we didn't process any immediate events !processed_events && // we didn't process any immediate events
@@ -120,8 +123,7 @@ bool event_poll(int32_t ms)
// once more to let libuv perform it's cleanup // once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL);
uv_run(uv_default_loop(), UV_RUN_NOWAIT); processed_events += loop(UV_RUN_NOWAIT, sources);
event_process(false);
} }
return !timer_data.timed_out && (processed_events || event_has_deferred()); return !timer_data.timed_out && (processed_events || event_has_deferred());
@@ -129,22 +131,41 @@ bool event_poll(int32_t ms)
bool event_has_deferred(void) bool event_has_deferred(void)
{ {
return !kl_empty(get_queue(true)); return !kl_empty(deferred_events);
} }
// Push an event to the queue // Queue an event
void event_push(Event event, bool deferred) void event_push(Event event)
{ {
*kl_pushp(Event, get_queue(deferred)) = event; bool defer = true;
if (immediate_sources) {
size_t i;
EventSource src;
for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
if (src == event.source) {
defer = false;
break;
}
}
}
*kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
}
void event_process(void)
{
process_from(deferred_events);
} }
// Runs the appropriate action for each queued event // Runs the appropriate action for each queued event
size_t event_process(bool deferred) static size_t process_from(klist_t(Event) *queue)
{ {
size_t count = 0; size_t count = 0;
Event event; Event event;
while (kl_shift(Event, get_queue(deferred), &event) == 0) { while (kl_shift(Event, queue, &event) == 0) {
switch (event.type) { switch (event.type) {
case kEventSignal: case kEventSignal:
signal_handle(event); signal_handle(event);
@@ -161,6 +182,8 @@ size_t event_process(bool deferred)
count++; count++;
} }
DLOG("Processed %u events", count);
return count; return count;
} }
@@ -179,7 +202,42 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle); uv_prepare_stop(handle);
} }
static klist_t(Event) *get_queue(bool deferred) static void requeue_deferred_events(void)
{ {
return deferred ? deferred_events : immediate_events; size_t remaining = deferred_events->size;
DLOG("Number of deferred events: %u", remaining);
while (remaining--) {
// Re-push each deferred event to ensure it will be in the right queue
Event event;
kl_shift(Event, deferred_events, &event);
event_push(event);
DLOG("Re-queueing event");
}
DLOG("Number of deferred events: %u", deferred_events->size);
}
static size_t loop(uv_run_mode run_mode, EventSource *sources)
{
size_t count;
immediate_sources = sources;
// It's possible that some events from the immediate sources are waiting
// in the deferred queue. If so, move them to the immediate queue so they
// will be processed in order of arrival by the next `process_from` call.
requeue_deferred_events();
count = process_from(immediate_events);
if (count) {
// No need to enter libuv, events were already processed
return count;
}
DLOG("Enter event loop");
uv_run(uv_default_loop(), run_mode);
DLOG("Exit event loop");
immediate_sources = NULL;
count = process_from(immediate_events);
return count;
} }

View File

@@ -6,6 +6,8 @@
#include "nvim/os/job_defs.h" #include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h" #include "nvim/os/rstream_defs.h"
typedef void * EventSource;
typedef enum { typedef enum {
kEventSignal, kEventSignal,
kEventRStreamData, kEventRStreamData,
@@ -13,6 +15,7 @@ typedef enum {
} EventType; } EventType;
typedef struct { typedef struct {
EventSource source;
EventType type; EventType type;
union { union {
int signum; int signum;

View File

@@ -34,7 +34,7 @@ static bool eof = false, started_reading = false;
void input_init(void) void input_init(void)
{ {
read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false); read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL);
rstream_set_file(read_stream, read_cmd_fd); rstream_set_file(read_stream, read_cmd_fd);
} }
@@ -129,7 +129,12 @@ bool os_isatty(int fd)
static bool input_poll(int32_t ms) static bool input_poll(int32_t ms)
{ {
return input_ready() || event_poll(ms) || input_ready(); EventSource input_sources[] = {
rstream_event_source(read_stream),
NULL
};
return input_ready() || event_poll(ms, input_sources) || input_ready();
} }
// This is a replacement for the old `WaitForChar` function in os_unix.c // This is a replacement for the old `WaitForChar` function in os_unix.c

View File

@@ -214,8 +214,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem); job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams // Start the readable streams
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out); rstream_start(job->out);
@@ -269,18 +269,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer); return wstream_write(job->in, buffer);
} }
/// Sets the `defer` flag for a Job instance
///
/// @param rstream The Job id
/// @param defer The new value for the flag
void job_set_defer(Job *job, bool defer)
{
job->defer = defer;
rstream_set_defer(job->out, defer);
rstream_set_defer(job->err, defer);
}
/// Runs the read callback associated with the job exit event /// Runs the read callback associated with the job exit event
/// ///
/// @param event Object containing data necessary to invoke the callback /// @param event Object containing data necessary to invoke the callback
@@ -307,6 +295,11 @@ void *job_data(Job *job)
return job->data; return job->data;
} }
EventSource job_event_source(Job *job)
{
return job;
}
static void job_exit_callback(Job *job) static void job_exit_callback(Job *job)
{ {
// Free the slot now, 'exit_cb' may want to start another job to replace // Free the slot now, 'exit_cb' may want to start another job to replace
@@ -391,10 +384,12 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
static void emit_exit_event(Job *job) static void emit_exit_event(Job *job)
{ {
Event event; Event event = {
event.type = kEventJobExit; .source = job_event_source(job),
event.data.job = job; .type = kEventJobExit,
event_push(event, true); .data.job = job
};
event_push(event);
} }
static void close_cb(uv_handle_t *handle) static void close_cb(uv_handle_t *handle)

View File

@@ -26,7 +26,8 @@ struct rstream {
uv_file fd; uv_file fd;
rstream_cb cb; rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos; size_t buffer_size, rpos, wpos, fpos;
bool reading, free_handle, defer; bool reading, free_handle;
EventSource source_override;
}; };
#ifdef INCLUDE_GENERATED_DECLARATIONS #ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -40,25 +41,25 @@ struct rstream {
/// for reading with `rstream_read` /// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer. /// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance /// @param data Some state to associate with the `RStream` instance
/// @param defer Flag that specifies if callback invocation should be deferred /// @param source_override Replacement for the default source used in events
/// to vim main loop(as a KE_EVENT special key) /// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance /// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb, RStream * rstream_new(rstream_cb cb,
size_t buffer_size, size_t buffer_size,
void *data, void *data,
bool defer) EventSource source_override)
{ {
RStream *rv = xmalloc(sizeof(RStream)); RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size); rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size; rv->buffer_size = buffer_size;
rv->data = data; rv->data = data;
rv->defer = defer;
rv->cb = cb; rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0; rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL; rv->stream = NULL;
rv->fread_idle = NULL; rv->fread_idle = NULL;
rv->free_handle = false; rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE; rv->file_type = UV_UNKNOWN_HANDLE;
rv->source_override = source_override ? source_override : rv;
return rv; return rv;
} }
@@ -213,15 +214,6 @@ size_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos; return rstream->wpos - rstream->rpos;
} }
/// Sets the `defer` flag for a a RStream instance
///
/// @param rstream The RStream instance
/// @param defer The new value for the flag
void rstream_set_defer(RStream *rstream, bool defer)
{
rstream->defer = defer;
}
/// Runs the read callback associated with the rstream /// Runs the read callback associated with the rstream
/// ///
/// @param event Object containing data necessary to invoke the callback /// @param event Object containing data necessary to invoke the callback
@@ -232,6 +224,11 @@ void rstream_read_event(Event event)
rstream->cb(rstream, rstream->data, event.data.rstream.eof); rstream->cb(rstream, rstream->data, event.data.rstream.eof);
} }
EventSource rstream_event_source(RStream *rstream)
{
return rstream->source_override;
}
// Callbacks used by libuv // Callbacks used by libuv
// Called by libuv to allocate memory for reading. // Called by libuv to allocate memory for reading.
@@ -260,7 +257,9 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) { if (cnt <= 0) {
if (cnt != UV_ENOBUFS) { if (cnt != UV_ENOBUFS) {
DLOG("Closing RStream(%p)", rstream); DLOG("Closing RStream(address: %p, source: %p)",
rstream,
rstream_event_source(rstream));
// Read error or EOF, either way stop the stream and invoke the callback // Read error or EOF, either way stop the stream and invoke the callback
// with eof == true // with eof == true
uv_read_stop(stream); uv_read_stop(stream);
@@ -275,12 +274,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect // Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer. // the space actually used in the buffer.
rstream->wpos += nread; rstream->wpos += nread;
DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rstream); DLOG("Received %u bytes from RStream(address: %p, source: %p)",
(size_t)cnt,
rstream,
rstream_event_source(rstream));
if (rstream->wpos == rstream->buffer_size) { if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now // The last read filled the buffer, stop reading for now
rstream_stop(rstream); rstream_stop(rstream);
DLOG("Buffer for RStream(%p) is full, stopping it", rstream); DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
rstream,
rstream_event_source(rstream));
} }
rstream->reading = false; rstream->reading = false;
@@ -345,9 +349,13 @@ static void close_cb(uv_handle_t *handle)
static void emit_read_event(RStream *rstream, bool eof) static void emit_read_event(RStream *rstream, bool eof)
{ {
Event event; Event event = {
event.type = kEventRStreamData; .source = rstream_event_source(rstream),
event.data.rstream.ptr = rstream; .type = kEventRStreamData,
event.data.rstream.eof = eof; .data.rstream = {
event_push(event, rstream->defer); .ptr = rstream,
.eof = eof
}
};
event_push(event);
} }

View File

@@ -103,6 +103,11 @@ void signal_handle(Event event)
} }
} }
EventSource signal_event_source(void)
{
return &sint;
}
static char * signal_name(int signum) static char * signal_name(int signum)
{ {
switch (signum) { switch (signum) {
@@ -155,10 +160,11 @@ static void signal_cb(uv_signal_t *handle, int signum)
} }
Event event = { Event event = {
.source = signal_event_source(),
.type = kEventSignal, .type = kEventSignal,
.data = { .data = {
.signum = signum .signum = signum
} }
}; };
event_push(event, true); event_push(event);
} }