RPC: eliminate NO_RESPONSE

Using a sentinel value in the response-id is ambiguous because the
msgpack-rpc spec allows all values (including zero/max). And clients
control the id, so we can't be sure they won't use the sentinel value.

Instead of a sentinel value, check the message type explicitly.

ref #8850
This commit is contained in:
Justin M. Keyes
2019-04-12 02:06:49 +02:00
parent b4ca56d96d
commit fd00806f01
4 changed files with 52 additions and 44 deletions

View File

@@ -281,23 +281,26 @@ static void parse_msgpack(Channel *channel)
// A not so uncommon cause for this might be deserializing objects with
// a high nesting level: msgpack will break when its internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
send_error(channel, 0, "Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
send_error(channel, kMessageTypeRequest, 0,
"Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
}
}
/// Handles requests and notifications received on the channel.
static void handle_request(Channel *channel, msgpack_object *request)
FUNC_ATTR_NONNULL_ALL
{
uint64_t request_id;
Error error = ERROR_INIT;
msgpack_rpc_validate(&request_id, request, &error);
MessageType type = msgpack_rpc_validate(&request_id, request, &error);
if (ERROR_SET(&error)) {
// Validation failed, send response with error
if (channel_write(channel,
serialize_response(channel->id,
type,
request_id,
&error,
NIL,
@@ -311,6 +314,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
api_clear_error(&error);
return;
}
assert(type == kMessageTypeRequest || type == kMessageTypeNotification);
MsgpackRpcRequestHandler handler;
msgpack_object *method = msgpack_rpc_method(request);
@@ -326,13 +330,14 @@ static void handle_request(Channel *channel, msgpack_object *request)
}
if (ERROR_SET(&error)) {
send_error(channel, request_id, error.msg);
send_error(channel, type, request_id, error.msg);
api_clear_error(&error);
api_free_array(args);
return;
}
RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
evdata->type = type;
evdata->channel = channel;
evdata->handler = handler;
evdata->args = args;
@@ -343,39 +348,41 @@ static void handle_request(Channel *channel, msgpack_object *request)
if (is_get_mode && !input_blocking()) {
// Defer the event to a special queue used by os/input.c. #6247
multiqueue_put(ch_before_blocking_events, on_request_event, 1, evdata);
multiqueue_put(ch_before_blocking_events, response_event, 1, evdata);
} else {
// Invoke immediately.
on_request_event((void **)&evdata);
response_event((void **)&evdata);
}
} else {
multiqueue_put(channel->events, on_request_event, 1, evdata);
multiqueue_put(channel->events, response_event, 1, evdata);
DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr);
}
}
static void on_request_event(void **argv)
/// Responds to a message, depending on the type:
/// - Request: writes the response.
/// - Notification: does nothing.
static void response_event(void **argv)
{
RequestEvent *e = argv[0];
Channel *channel = e->channel;
MsgpackRpcRequestHandler handler = e->handler;
Array args = e->args;
uint64_t request_id = e->request_id;
Error error = ERROR_INIT;
Object result = handler.fn(channel->id, args, &error);
if (request_id != NO_RESPONSE) {
// send the response
Object result = handler.fn(channel->id, e->args, &error);
if (e->type == kMessageTypeRequest) {
// Send the response.
msgpack_packer response;
msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
channel_write(channel, serialize_response(channel->id,
request_id,
e->type,
e->request_id,
&error,
result,
&out_buffer));
} else {
api_free_object(result);
}
api_free_array(args);
api_free_array(e->args);
channel_decref(channel);
xfree(e);
api_clear_error(&error);
@@ -430,15 +437,16 @@ static void internal_read_event(void **argv)
wstream_release_wbuffer(buffer);
}
static void send_error(Channel *channel, uint64_t id, char *err)
static void send_error(Channel *chan, MessageType type, uint64_t id, char *err)
{
Error e = ERROR_INIT;
api_set_error(&e, kErrorTypeException, "%s", err);
channel_write(channel, serialize_response(channel->id,
id,
&e,
NIL,
&out_buffer));
channel_write(chan, serialize_response(chan->id,
type,
id,
&e,
NIL,
&out_buffer));
api_clear_error(&e);
}
@@ -634,6 +642,7 @@ static WBuffer *serialize_request(uint64_t channel_id,
}
static WBuffer *serialize_response(uint64_t channel_id,
MessageType type,
uint64_t response_id,
Error *err,
Object arg,
@@ -641,7 +650,7 @@ static WBuffer *serialize_response(uint64_t channel_id,
{
msgpack_packer pac;
msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
if (ERROR_SET(err) && response_id == NO_RESPONSE) {
if (ERROR_SET(err) && type == kMessageTypeNotification) {
Array args = ARRAY_DICT_INIT;
ADD(args, INTEGER_OBJ(err->type));
ADD(args, STRING_OBJ(cstr_to_string(err->msg)));