channel: Simplify resource management

- Remove unused rpc_call_level field
- Add `returned` field to ChannelCallFrame. This is set when the call returns
  and is the only condition checked by `channel_send_call`.
- Add job_exit callback for properly closing channels created from job(the
  job_exit callback is only called after all read callbacks, so it's the only
  safe place to free the channel).
This commit is contained in:
Thiago de Arruda
2014-10-20 07:46:19 -03:00
parent b280308ac6
commit cf6f60ce4d

View File

@@ -31,14 +31,14 @@
typedef struct { typedef struct {
uint64_t request_id; uint64_t request_id;
bool errored; bool returned, errored;
Object result; Object result;
} ChannelCallFrame; } ChannelCallFrame;
typedef struct { typedef struct {
uint64_t id; uint64_t id;
PMap(cstr_t) *subscribed_events; PMap(cstr_t) *subscribed_events;
bool is_job, enabled; bool is_job, closed;
msgpack_unpacker *unpacker; msgpack_unpacker *unpacker;
union { union {
Job *job; Job *job;
@@ -50,7 +50,6 @@ typedef struct {
} data; } data;
uint64_t next_request_id; uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack; kvec_t(ChannelCallFrame *) call_stack;
size_t rpc_call_level;
} Channel; } Channel;
static uint64_t next_id = 1; static uint64_t next_id = 1;
@@ -103,12 +102,12 @@ uint64_t channel_from_job(char **argv)
channel, channel,
job_out, job_out,
job_err, job_err,
NULL, job_exit,
0, 0,
&status); &status);
if (status <= 0) { if (status <= 0) {
close_channel(channel); free_channel(channel);
return 0; return 0;
} }
@@ -141,7 +140,7 @@ bool channel_exists(uint64_t id)
{ {
Channel *channel; Channel *channel;
return (channel = pmap_get(uint64_t)(channels, id)) != NULL return (channel = pmap_get(uint64_t)(channels, id)) != NULL
&& channel->enabled; && !channel->closed;
} }
/// Sends event/arguments to channel /// Sends event/arguments to channel
@@ -156,7 +155,7 @@ bool channel_send_event(uint64_t id, char *name, Array args)
Channel *channel = NULL; Channel *channel = NULL;
if (id > 0) { if (id > 0) {
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
api_free_array(args); api_free_array(args);
return false; return false;
} }
@@ -182,7 +181,7 @@ Object channel_send_call(uint64_t id,
{ {
Channel *channel = NULL; Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id); api_set_error(err, Exception, _("Invalid channel \"%" PRIu64 "\""), id);
api_free_array(args); api_free_array(args);
return NIL; return NIL;
@@ -208,16 +207,14 @@ Object channel_send_call(uint64_t id,
EventSource sources[] = {channel_source, NULL}; EventSource sources[] = {channel_source, NULL};
// Push the frame // Push the frame
ChannelCallFrame frame = {request_id, false, NIL}; ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame); kv_push(ChannelCallFrame *, channel->call_stack, &frame);
size_t size = kv_size(channel->call_stack);
do { do {
event_poll(-1, sources); event_poll(-1, sources);
} while ( } while (!frame.returned);
// Continue running if ...
channel->enabled && // the channel is still enabled (void)kv_pop(channel->call_stack);
kv_size(channel->call_stack) >= size); // the call didn't return
if (frame.errored) { if (frame.errored) {
api_set_error(err, Exception, "%s", frame.result.data.string.data); api_set_error(err, Exception, "%s", frame.result.data.string.data);
@@ -235,7 +232,7 @@ void channel_subscribe(uint64_t id, char *event)
{ {
Channel *channel; Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
abort(); abort();
} }
@@ -257,7 +254,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{ {
Channel *channel; Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
abort(); abort();
} }
@@ -272,12 +269,11 @@ bool channel_close(uint64_t id)
{ {
Channel *channel; Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
return false; return false;
} }
channel_kill(channel); close_channel(channel);
channel->enabled = false;
return true; return true;
} }
@@ -319,19 +315,16 @@ static void job_err(RStream *rstream, void *data, bool eof)
} }
} }
static void job_exit(Job *job, void *data)
{
free_channel((Channel *)data);
}
static void parse_msgpack(RStream *rstream, void *data, bool eof) static void parse_msgpack(RStream *rstream, void *data, bool eof)
{ {
Channel *channel = data; Channel *channel = data;
channel->rpc_call_level++;
if (eof) { if (eof) {
char buf[256];
snprintf(buf,
sizeof(buf),
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
call_set_error(channel, buf);
goto end; goto end;
} }
@@ -354,7 +347,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
MSGPACK_UNPACK_SUCCESS) { MSGPACK_UNPACK_SUCCESS) {
if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
if (is_valid_rpc_response(&unpacked.data, channel)) { if (is_valid_rpc_response(&unpacked.data, channel)) {
call_stack_pop(&unpacked.data, channel); complete_call(&unpacked.data, channel);
} else { } else {
char buf[256]; char buf[256];
snprintf(buf, snprintf(buf,
@@ -397,10 +390,11 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
} }
end: end:
channel->rpc_call_level--; if (eof && !channel->is_job && !kv_size(channel->call_stack)) {
if (!channel->enabled && !kv_size(channel->call_stack)) { // The free_channel call is deferred for jobs because it's possible that
// Now it's safe to destroy the channel // job_stderr will called after this. For non-job channels, this is the
close_channel(channel); // last callback so it must be freed now.
free_channel(channel);
} }
} }
@@ -500,26 +494,11 @@ static void unsubscribe(Channel *channel, char *event)
free(event_string); free(event_string);
} }
/// Close the channel streams/job. The channel resources will be freed by
/// free_channel later.
static void close_channel(Channel *channel) static void close_channel(Channel *channel)
{ {
pmap_del(uint64_t)(channels, channel->id); channel->closed = true;
msgpack_unpacker_free(channel->unpacker);
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
channel_kill(channel);
free(channel);
}
static void channel_kill(Channel *channel)
{
if (channel->is_job) { if (channel->is_job) {
if (channel->data.job) { if (channel->data.job) {
job_stop(channel->data.job); job_stop(channel->data.job);
@@ -536,6 +515,22 @@ static void channel_kill(Channel *channel)
} }
} }
static void free_channel(Channel *channel)
{
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
free(channel);
}
static void close_cb(uv_handle_t *handle) static void close_cb(uv_handle_t *handle)
{ {
free(handle->data); free(handle->data);
@@ -545,8 +540,7 @@ static void close_cb(uv_handle_t *handle)
static Channel *register_channel(void) static Channel *register_channel(void)
{ {
Channel *rv = xmalloc(sizeof(Channel)); Channel *rv = xmalloc(sizeof(Channel));
rv->enabled = true; rv->closed = false;
rv->rpc_call_level = 0;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = next_id++; rv->id = next_id++;
rv->subscribed_events = pmap_new(cstr_t)(); rv->subscribed_events = pmap_new(cstr_t)();
@@ -573,9 +567,11 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
kv_size(channel->call_stack) - 1)->request_id; kv_size(channel->call_stack) - 1)->request_id;
} }
static void call_stack_pop(msgpack_object *obj, Channel *channel) static void complete_call(msgpack_object *obj, Channel *channel)
{ {
ChannelCallFrame *frame = kv_pop(channel->call_stack); ChannelCallFrame *frame = kv_A(channel->call_stack,
kv_size(channel->call_stack) - 1);
frame->returned = true;
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
if (frame->errored) { if (frame->errored) {
@@ -588,10 +584,11 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
static void call_set_error(Channel *channel, char *msg) static void call_set_error(Channel *channel, char *msg)
{ {
for (size_t i = 0; i < kv_size(channel->call_stack); i++) { for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_pop(channel->call_stack); ChannelCallFrame *frame = kv_A(channel->call_stack, i);
frame->returned = true;
frame->errored = true; frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg)); frame->result = STRING_OBJ(cstr_to_string(msg));
} }
channel->enabled = false; close_channel(channel);
} }