fix(rpc): include write failure reason in log (#37959)

Also don't use LOGLVL_ERR on UV_EPIPE.
This commit is contained in:
zeertzjq
2026-02-19 10:34:22 +08:00
committed by GitHub
parent c3589753a0
commit e142e01d57
4 changed files with 24 additions and 20 deletions

View File

@@ -617,7 +617,7 @@ size_t channel_send(uint64_t id, char *data, size_t len, bool data_owned, const
// write can be delayed indefinitely, so always use an allocated buffer
WBuffer *buf = wstream_new_buffer(data_owned ? data : xmemdup(data, len),
len, 1, xfree);
return wstream_write(in, buf) ? len : 0;
return wstream_write(in, buf) == 0 ? len : 0;
retfree:
if (data_owned) {

View File

@@ -63,8 +63,8 @@ void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
///
/// @param stream The `Stream` instance
/// @param buffer The buffer which contains data to be written
/// @return false if the write failed
bool wstream_write(Stream *stream, WBuffer *buffer)
/// @return 0 on success, or libuv error code on failure
int wstream_write(Stream *stream, WBuffer *buffer)
FUNC_ATTR_NONNULL_ALL
{
assert(stream->maxmem);
@@ -72,6 +72,7 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
// This should not be called after a stream was freed
assert(!stream->closed);
int err = 0;
uv_buf_t uvbuf;
uvbuf.base = buffer->data;
uvbuf.len = UV_BUF_LEN(buffer->size);
@@ -80,7 +81,7 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
uv_fs_t req;
// Synchronous write
uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL);
err = uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL);
uv_fs_req_cleanup(&req);
@@ -89,11 +90,12 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
assert(stream->write_cb == NULL);
stream->fpos += MAX(req.result, 0);
return req.result > 0;
return req.result > 0 ? 0 : err != 0 ? err : UV_UNKNOWN;
}
if (stream->curmem > stream->maxmem) {
goto err;
err = UV_ENOMEM;
goto fail;
}
stream->curmem += buffer->size;
@@ -103,17 +105,19 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
data->buffer = buffer;
data->uv_req.data = data;
if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) {
if ((err = uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) != 0) {
xfree(data);
goto err;
goto fail;
}
stream->pending_reqs++;
return true;
assert(err == 0);
return 0;
err:
fail:
wstream_release_wbuffer(buffer);
return false;
assert(err != 0);
return err;
}
/// Creates a WBuffer object for holding output data. Instances of this

View File

@@ -393,7 +393,7 @@ bool rpc_write_raw(uint64_t id, WBuffer *buffer)
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success;
int err = 0;
if (channel->rpc.closed) {
wstream_release_wbuffer(buffer);
@@ -403,24 +403,24 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
if (channel->streamtype == kChannelStreamInternal) {
channel_incref(channel);
CREATE_EVENT(channel->events, internal_read_event, channel, buffer);
success = true;
} else {
Stream *in = channel_instream(channel);
success = wstream_write(in, buffer);
err = wstream_write(in, buffer);
}
if (!success) {
if (err != 0) {
// If the write failed for any reason, close the channel
char buf[256];
snprintf(buf,
sizeof(buf),
"ch %" PRIu64 ": stream write failed. "
"ch %" PRIu64 ": stream write failed: %s. "
"RPC canceled; closing channel",
channel->id);
chan_close_on_err(channel, buf, LOGLVL_ERR);
channel->id, os_strerror(err));
// UV_EPIPE can happen if pipe is closed by peer and shouldn't be an error.
chan_close_on_err(channel, buf, err == UV_EPIPE ? LOGLVL_INF : LOGLVL_ERR);
}
return success;
return err == 0;
}
static void internal_read_event(void **argv)

View File

@@ -923,7 +923,7 @@ static int do_os_system(char **argv, const char *input, size_t len, char **outpu
if (has_input) {
WBuffer *input_buffer = wstream_new_buffer((char *)input, len, 1, NULL);
if (!wstream_write(&proc->in, input_buffer)) {
if (wstream_write(&proc->in, input_buffer) != 0) {
// couldn't write, stop the process and tell the user about it
proc_stop(proc);
goto end;