event: Refactor async event processing

- Improve the implementation of deferred/immediate events.
- Use the new queue module to change how/when events are queued/processed by
  giving a private queue to each emitter.
- Immediate events(which only exist to break uv_run recursion) are now
  represented in the `loop->fast_events` queue.
- Events pushed to child queues are propagated to the event loop main queue and
  processed as K_EVENT keys.
This commit is contained in:
Thiago de Arruda
2015-08-07 22:54:02 -03:00
parent a6e0d35d2d
commit 502aee690c
28 changed files with 216 additions and 211 deletions

View File

@@ -68,6 +68,7 @@ typedef struct {
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
Queue *events;
} Channel;
typedef struct {
@@ -224,7 +225,7 @@ Object channel_send_call(uint64_t id,
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
channel->pending_requests++;
LOOP_POLL_EVENTS_UNTIL(&loop, -1, frame.returned);
LOOP_PROCESS_EVENTS_UNTIL(&loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
@@ -459,22 +460,22 @@ static void handle_request(Channel *channel, msgpack_object *request)
handler.async = true;
}
bool async = kv_size(channel->call_stack) || handler.async;
RequestEvent *event_data = xmalloc(sizeof(RequestEvent));
event_data->channel = channel;
event_data->handler = handler;
event_data->args = args;
event_data->request_id = request_id;
incref(channel);
loop_push_event(&loop, (Event) {
.handler = on_request_event,
.data = event_data
}, !async);
if (handler.async) {
on_request_event((void **)&event_data);
} else {
queue_put(channel->events, on_request_event, 1, event_data);
}
}
static void on_request_event(Event event)
static void on_request_event(void **argv)
{
RequestEvent *e = event.data;
RequestEvent *e = argv[0];
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
@@ -649,9 +650,8 @@ static void close_channel(Channel *channel)
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL);
stream_close(&channel->data.std.out, NULL);
loop_push_event(&loop,
(Event) { .handler = on_stdio_close, .data = channel }, false);
break;
queue_put(loop.fast_events, exit_event, 1, channel);
return;
default:
abort();
}
@@ -659,9 +659,9 @@ static void close_channel(Channel *channel)
decref(channel);
}
static void on_stdio_close(Event e)
static void exit_event(void **argv)
{
decref(e.data);
decref(argv[0]);
if (!exiting) {
mch_exit(0);
@@ -683,6 +683,7 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
queue_free(channel->events);
xfree(channel);
}
@@ -694,6 +695,7 @@ static void close_cb(Stream *stream, void *data)
static Channel *register_channel(ChannelType type)
{
Channel *rv = xmalloc(sizeof(Channel));
rv->events = queue_new_child(loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;