mirror of
https://github.com/neovim/neovim.git
synced 2025-10-15 14:26:07 +00:00
wstream: Pass WBuffer refcount as a constructor parameter
This is required to handle broadcasting when the first write fails.
Ref: 11916b6b59 (commitcomment-6792287)
This commit is contained in:
@@ -10560,6 +10560,7 @@ static void f_job_write(typval_T *argvars, typval_T *rettv)
|
|||||||
|
|
||||||
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
|
WBuffer *buf = wstream_new_buffer(xstrdup((char *)argvars[1].vval.v_string),
|
||||||
strlen((char *)argvars[1].vval.v_string),
|
strlen((char *)argvars[1].vval.v_string),
|
||||||
|
1,
|
||||||
free);
|
free);
|
||||||
rettv->vval.v_number = job_write(job, buf);
|
rettv->vval.v_number = job_write(job, buf);
|
||||||
}
|
}
|
||||||
|
@@ -383,7 +383,7 @@ static void send_request(Channel *channel,
|
|||||||
Object arg)
|
Object arg)
|
||||||
{
|
{
|
||||||
String method = {.size = strlen(name), .data = name};
|
String method = {.size = strlen(name), .data = name};
|
||||||
channel_write(channel, serialize_request(id, method, arg, &out_buffer));
|
channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void send_event(Channel *channel,
|
static void send_event(Channel *channel,
|
||||||
@@ -391,7 +391,7 @@ static void send_event(Channel *channel,
|
|||||||
Object arg)
|
Object arg)
|
||||||
{
|
{
|
||||||
String method = {.size = strlen(name), .data = name};
|
String method = {.size = strlen(name), .data = name};
|
||||||
channel_write(channel, serialize_request(0, method, arg, &out_buffer));
|
channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void broadcast_event(char *name, Object arg)
|
static void broadcast_event(char *name, Object arg)
|
||||||
@@ -412,7 +412,11 @@ static void broadcast_event(char *name, Object arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
String method = {.size = strlen(name), .data = name};
|
String method = {.size = strlen(name), .data = name};
|
||||||
WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
|
WBuffer *buffer = serialize_request(0,
|
||||||
|
method,
|
||||||
|
arg,
|
||||||
|
&out_buffer,
|
||||||
|
kv_size(subscribed));
|
||||||
|
|
||||||
for (size_t i = 0; i < kv_size(subscribed); i++) {
|
for (size_t i = 0; i < kv_size(subscribed); i++) {
|
||||||
channel_write(kv_A(subscribed, i), buffer);
|
channel_write(kv_A(subscribed, i), buffer);
|
||||||
|
@@ -113,7 +113,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
|
|||||||
WBuffer *serialize_request(uint64_t request_id,
|
WBuffer *serialize_request(uint64_t request_id,
|
||||||
String method,
|
String method,
|
||||||
Object arg,
|
Object arg,
|
||||||
msgpack_sbuffer *sbuffer)
|
msgpack_sbuffer *sbuffer,
|
||||||
|
size_t refcount)
|
||||||
FUNC_ATTR_NONNULL_ARG(4)
|
FUNC_ATTR_NONNULL_ARG(4)
|
||||||
{
|
{
|
||||||
msgpack_packer pac;
|
msgpack_packer pac;
|
||||||
@@ -130,6 +131,7 @@ WBuffer *serialize_request(uint64_t request_id,
|
|||||||
msgpack_rpc_from_object(arg, &pac);
|
msgpack_rpc_from_object(arg, &pac);
|
||||||
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
||||||
sbuffer->size,
|
sbuffer->size,
|
||||||
|
refcount,
|
||||||
free);
|
free);
|
||||||
msgpack_rpc_free_object(arg);
|
msgpack_rpc_free_object(arg);
|
||||||
msgpack_sbuffer_clear(sbuffer);
|
msgpack_sbuffer_clear(sbuffer);
|
||||||
@@ -165,6 +167,7 @@ WBuffer *serialize_response(uint64_t response_id,
|
|||||||
|
|
||||||
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
||||||
sbuffer->size,
|
sbuffer->size,
|
||||||
|
1, // responses only go though 1 channel
|
||||||
free);
|
free);
|
||||||
msgpack_rpc_free_object(arg);
|
msgpack_rpc_free_object(arg);
|
||||||
msgpack_sbuffer_clear(sbuffer);
|
msgpack_sbuffer_clear(sbuffer);
|
||||||
@@ -190,6 +193,7 @@ WBuffer *serialize_metadata(uint64_t id,
|
|||||||
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
|
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
|
||||||
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
|
||||||
sbuffer->size,
|
sbuffer->size,
|
||||||
|
1,
|
||||||
free);
|
free);
|
||||||
msgpack_sbuffer_clear(sbuffer);
|
msgpack_sbuffer_clear(sbuffer);
|
||||||
return rv;
|
return rv;
|
||||||
|
@@ -92,33 +92,33 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream)
|
|||||||
/// @return false if the write failed
|
/// @return false if the write failed
|
||||||
bool wstream_write(WStream *wstream, WBuffer *buffer)
|
bool wstream_write(WStream *wstream, WBuffer *buffer)
|
||||||
{
|
{
|
||||||
WriteData *data;
|
|
||||||
uv_buf_t uvbuf;
|
|
||||||
uv_write_t *req;
|
|
||||||
|
|
||||||
// This should not be called after a wstream was freed
|
// This should not be called after a wstream was freed
|
||||||
assert(!wstream->freed);
|
assert(!wstream->freed);
|
||||||
|
|
||||||
buffer->refcount++;
|
|
||||||
|
|
||||||
if (wstream->curmem > wstream->maxmem) {
|
if (wstream->curmem > wstream->maxmem) {
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
wstream->curmem += buffer->size;
|
wstream->curmem += buffer->size;
|
||||||
data = xmalloc(sizeof(WriteData));
|
|
||||||
|
WriteData *data = xmalloc(sizeof(WriteData));
|
||||||
data->wstream = wstream;
|
data->wstream = wstream;
|
||||||
data->buffer = buffer;
|
data->buffer = buffer;
|
||||||
req = xmalloc(sizeof(uv_write_t));
|
|
||||||
|
uv_write_t *req = xmalloc(sizeof(uv_write_t));
|
||||||
req->data = data;
|
req->data = data;
|
||||||
|
|
||||||
|
uv_buf_t uvbuf;
|
||||||
uvbuf.base = buffer->data;
|
uvbuf.base = buffer->data;
|
||||||
uvbuf.len = buffer->size;
|
uvbuf.len = buffer->size;
|
||||||
wstream->pending_reqs++;
|
|
||||||
|
|
||||||
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
|
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
|
||||||
|
free(data);
|
||||||
|
free(req);
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wstream->pending_reqs++;
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
err:
|
err:
|
||||||
@@ -133,14 +133,19 @@ err:
|
|||||||
///
|
///
|
||||||
/// @param data Data stored by the WBuffer
|
/// @param data Data stored by the WBuffer
|
||||||
/// @param size The size of the data array
|
/// @param size The size of the data array
|
||||||
|
/// @param refcount The number of references for the WBuffer. This will be used
|
||||||
|
/// by WStream instances to decide when a WBuffer should be freed.
|
||||||
/// @param cb Pointer to function that will be responsible for freeing
|
/// @param cb Pointer to function that will be responsible for freeing
|
||||||
/// the buffer data(passing 'free' will work as expected).
|
/// the buffer data(passing 'free' will work as expected).
|
||||||
/// @return The allocated WBuffer instance
|
/// @return The allocated WBuffer instance
|
||||||
WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb)
|
WBuffer *wstream_new_buffer(char *data,
|
||||||
|
size_t size,
|
||||||
|
size_t refcount,
|
||||||
|
wbuffer_data_finalizer cb)
|
||||||
{
|
{
|
||||||
WBuffer *rv = xmalloc(sizeof(WBuffer));
|
WBuffer *rv = xmalloc(sizeof(WBuffer));
|
||||||
rv->size = size;
|
rv->size = size;
|
||||||
rv->refcount = 0;
|
rv->refcount = refcount;
|
||||||
rv->cb = cb;
|
rv->cb = cb;
|
||||||
rv->data = data;
|
rv->data = data;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user