Files
neovim/src/mpack/rpc.c
2021-09-21 09:02:43 -07:00

335 lines
8.8 KiB
C

// This is an open source non-commercial project. Dear PVS-Studio, please check
// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
#include <string.h>
#include "rpc.h"
enum {
MPACK_RPC_RECEIVE_ARRAY = 1,
MPACK_RPC_RECEIVE_TYPE,
MPACK_RPC_RECEIVE_ID
};
static mpack_rpc_header_t mpack_rpc_request_hdr(void);
static mpack_rpc_header_t mpack_rpc_reply_hdr(void);
static mpack_rpc_header_t mpack_rpc_notify_hdr(void);
static int mpack_rpc_put(mpack_rpc_session_t *s, mpack_rpc_message_t m);
static int mpack_rpc_pop(mpack_rpc_session_t *s, mpack_rpc_message_t *m);
static void mpack_rpc_reset_hdr(mpack_rpc_header_t *hdr);
MPACK_API void mpack_rpc_session_init(mpack_rpc_session_t *session,
mpack_uint32_t capacity)
{
session->capacity = capacity ? capacity : MPACK_RPC_MAX_REQUESTS;
session->request_id = 0;
mpack_tokbuf_init(&session->reader);
mpack_tokbuf_init(&session->writer);
mpack_rpc_reset_hdr(&session->receive);
mpack_rpc_reset_hdr(&session->send);
memset(session->slots, 0,
sizeof(struct mpack_rpc_slot_s) * session->capacity);
}
MPACK_API int mpack_rpc_receive_tok(mpack_rpc_session_t *session,
mpack_token_t tok, mpack_rpc_message_t *msg)
{
int type;
if (session->receive.index == 0) {
if (tok.type != MPACK_TOKEN_ARRAY)
/* not an array */
return MPACK_RPC_EARRAY;
if (tok.length < 3 || tok.length > 4)
/* invalid array length */
return MPACK_RPC_EARRAYL;
session->receive.toks[0] = tok;
session->receive.index++;
return MPACK_EOF; /* get the type */
}
if (session->receive.index == 1) {
if (tok.type != MPACK_TOKEN_UINT || tok.length > 1 || tok.data.value.lo > 2)
/* invalid type */
return MPACK_RPC_ETYPE;
if (tok.data.value.lo < 2 && session->receive.toks[0].length != 4)
/* request or response with array length != 4 */
return MPACK_RPC_EARRAYL;
if (tok.data.value.lo == 2 && session->receive.toks[0].length != 3)
/* notification with array length != 3 */
return MPACK_RPC_EARRAYL;
session->receive.toks[1] = tok;
session->receive.index++;
if (tok.data.value.lo < 2) return MPACK_EOF;
type = MPACK_RPC_NOTIFICATION;
goto end;
}
assert(session->receive.index == 2);
if (tok.type != MPACK_TOKEN_UINT || tok.length > 4)
/* invalid request/response id */
return MPACK_RPC_EMSGID;
msg->id = tok.data.value.lo;
msg->data.p = NULL;
type = (int)session->receive.toks[1].data.value.lo + MPACK_RPC_REQUEST;
if (type == MPACK_RPC_RESPONSE && !mpack_rpc_pop(session, msg))
/* response with invalid id */
return MPACK_RPC_ERESPID;
end:
mpack_rpc_reset_hdr(&session->receive);
return type;
}
MPACK_API int mpack_rpc_request_tok(mpack_rpc_session_t *session,
mpack_token_t *tok, mpack_data_t data)
{
if (session->send.index == 0) {
int status;
mpack_rpc_message_t msg;
do {
msg.id = session->request_id;
msg.data = data;
session->send = mpack_rpc_request_hdr();
session->send.toks[2].type = MPACK_TOKEN_UINT;
session->send.toks[2].data.value.lo = msg.id;
session->send.toks[2].data.value.hi = 0;
*tok = session->send.toks[0];
status = mpack_rpc_put(session, msg);
if (status == -1) return MPACK_NOMEM;
session->request_id = (session->request_id + 1) % 0xffffffff;
} while (!status);
session->send.index++;
return MPACK_EOF;
}
if (session->send.index == 1) {
*tok = session->send.toks[1];
session->send.index++;
return MPACK_EOF;
}
assert(session->send.index == 2);
*tok = session->send.toks[2];
mpack_rpc_reset_hdr(&session->send);
return MPACK_OK;
}
MPACK_API int mpack_rpc_reply_tok(mpack_rpc_session_t *session,
mpack_token_t *tok, mpack_uint32_t id)
{
if (session->send.index == 0) {
session->send = mpack_rpc_reply_hdr();
session->send.toks[2].type = MPACK_TOKEN_UINT;
session->send.toks[2].data.value.lo = id;
session->send.toks[2].data.value.hi = 0;
*tok = session->send.toks[0];
session->send.index++;
return MPACK_EOF;
}
if (session->send.index == 1) {
*tok = session->send.toks[1];
session->send.index++;
return MPACK_EOF;
}
assert(session->send.index == 2);
*tok = session->send.toks[2];
mpack_rpc_reset_hdr(&session->send);
return MPACK_OK;
}
MPACK_API int mpack_rpc_notify_tok(mpack_rpc_session_t *session,
mpack_token_t *tok)
{
if (session->send.index == 0) {
session->send = mpack_rpc_notify_hdr();
*tok = session->send.toks[0];
session->send.index++;
return MPACK_EOF;
}
assert(session->send.index == 1);
*tok = session->send.toks[1];
mpack_rpc_reset_hdr(&session->send);
return MPACK_OK;
}
MPACK_API int mpack_rpc_receive(mpack_rpc_session_t *session, const char **buf,
size_t *buflen, mpack_rpc_message_t *msg)
{
int status;
do {
mpack_token_t tok;
status = mpack_read(&session->reader, buf, buflen, &tok);
if (status) break;
status = mpack_rpc_receive_tok(session, tok, msg);
if (status >= MPACK_RPC_REQUEST) break;
} while (*buflen);
return status;
}
MPACK_API int mpack_rpc_request(mpack_rpc_session_t *session, char **buf,
size_t *buflen, mpack_data_t data)
{
int status = MPACK_EOF;
while (status && *buflen) {
int write_status;
mpack_token_t tok;
if (!session->writer.plen) {
status = mpack_rpc_request_tok(session, &tok, data);
}
if (status == MPACK_NOMEM) break;
write_status = mpack_write(&session->writer, buf, buflen, &tok);
status = write_status ? write_status : status;
}
return status;
}
MPACK_API int mpack_rpc_reply(mpack_rpc_session_t *session, char **buf,
size_t *buflen, mpack_uint32_t id)
{
int status = MPACK_EOF;
while (status && *buflen) {
int write_status;
mpack_token_t tok;
if (!session->writer.plen) {
status = mpack_rpc_reply_tok(session, &tok, id);
}
write_status = mpack_write(&session->writer, buf, buflen, &tok);
status = write_status ? write_status : status;
}
return status;
}
MPACK_API int mpack_rpc_notify(mpack_rpc_session_t *session, char **buf,
size_t *buflen)
{
int status = MPACK_EOF;
while (status && *buflen) {
int write_status;
mpack_token_t tok;
if (!session->writer.plen) {
status = mpack_rpc_notify_tok(session, &tok);
}
write_status = mpack_write(&session->writer, buf, buflen, &tok);
status = write_status ? write_status : status;
}
return status;
}
MPACK_API void mpack_rpc_session_copy(mpack_rpc_session_t *dst,
mpack_rpc_session_t *src)
{
mpack_uint32_t i;
mpack_uint32_t dst_capacity = dst->capacity;
assert(src->capacity <= dst_capacity);
/* copy all fields except slots */
memcpy(dst, src, sizeof(mpack_rpc_one_session_t) -
sizeof(struct mpack_rpc_slot_s));
/* reset capacity */
dst->capacity = dst_capacity;
/* reinsert requests */
memset(dst->slots, 0, sizeof(struct mpack_rpc_slot_s) * dst->capacity);
for (i = 0; i < src->capacity; i++) {
if (src->slots[i].used) mpack_rpc_put(dst, src->slots[i].msg);
}
}
static mpack_rpc_header_t mpack_rpc_request_hdr(void)
{
mpack_rpc_header_t hdr;
hdr.index = 0;
hdr.toks[0].type = MPACK_TOKEN_ARRAY;
hdr.toks[0].length = 4;
hdr.toks[1].type = MPACK_TOKEN_UINT;
hdr.toks[1].data.value.lo = 0;
hdr.toks[1].data.value.hi = 0;
return hdr;
}
static mpack_rpc_header_t mpack_rpc_reply_hdr(void)
{
mpack_rpc_header_t hdr = mpack_rpc_request_hdr();
hdr.toks[1].data.value.lo = 1;
hdr.toks[1].data.value.hi = 0;
return hdr;
}
static mpack_rpc_header_t mpack_rpc_notify_hdr(void)
{
mpack_rpc_header_t hdr = mpack_rpc_request_hdr();
hdr.toks[0].length = 3;
hdr.toks[1].data.value.lo = 2;
hdr.toks[1].data.value.hi = 0;
return hdr;
}
static int mpack_rpc_put(mpack_rpc_session_t *session, mpack_rpc_message_t msg)
{
struct mpack_rpc_slot_s *slot = NULL;
mpack_uint32_t i;
mpack_uint32_t hash = msg.id % session->capacity;
for (i = 0; i < session->capacity; i++) {
if (!session->slots[hash].used || session->slots[hash].msg.id == msg.id) {
slot = session->slots + hash;
break;
}
hash = hash > 0 ? hash - 1 : session->capacity - 1;
}
if (!slot) return -1; /* no space */
if (slot->msg.id == msg.id && slot->used) return 0; /* duplicate key */
slot->msg = msg;
slot->used = 1;
return 1;
}
static int mpack_rpc_pop(mpack_rpc_session_t *session, mpack_rpc_message_t *msg)
{
struct mpack_rpc_slot_s *slot = NULL;
mpack_uint32_t i;
mpack_uint32_t hash = msg->id % session->capacity;
for (i = 0; i < session->capacity; i++) {
if (session->slots[hash].used && session->slots[hash].msg.id == msg->id) {
slot = session->slots + hash;
break;
}
hash = hash > 0 ? hash - 1 : session->capacity - 1;
}
if (!slot) return 0;
*msg = slot->msg;
slot->used = 0;
return 1;
}
static void mpack_rpc_reset_hdr(mpack_rpc_header_t *hdr)
{
hdr->index = 0;
}