Merge pull request #29106 from bfredl/rwstream

refactor(io): separate types for read and write streams
This commit is contained in:
bfredl
2024-06-01 09:53:45 +02:00
committed by GitHub
18 changed files with 158 additions and 132 deletions

View File

@@ -126,19 +126,19 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
*error = e_invstream; *error = e_invstream;
return false; return false;
} }
stream_may_close(&chan->stream.socket); rstream_may_close(&chan->stream.socket);
break; break;
case kChannelStreamProc: case kChannelStreamProc:
proc = &chan->stream.proc; proc = &chan->stream.proc;
if (part == kChannelPartStdin || close_main) { if (part == kChannelPartStdin || close_main) {
stream_may_close(&proc->in); wstream_may_close(&proc->in);
} }
if (part == kChannelPartStdout || close_main) { if (part == kChannelPartStdout || close_main) {
stream_may_close(&proc->out); rstream_may_close(&proc->out);
} }
if (part == kChannelPartStderr || part == kChannelPartAll) { if (part == kChannelPartStderr || part == kChannelPartAll) {
stream_may_close(&proc->err); rstream_may_close(&proc->err);
} }
if (proc->type == kProcessTypePty && part == kChannelPartAll) { if (proc->type == kProcessTypePty && part == kChannelPartAll) {
pty_process_close_master(&chan->stream.pty); pty_process_close_master(&chan->stream.pty);
@@ -148,10 +148,10 @@ bool channel_close(uint64_t id, ChannelPart part, const char **error)
case kChannelStreamStdio: case kChannelStreamStdio:
if (part == kChannelPartStdin || close_main) { if (part == kChannelPartStdin || close_main) {
stream_may_close(&chan->stream.stdio.in); rstream_may_close(&chan->stream.stdio.in);
} }
if (part == kChannelPartStdout || close_main) { if (part == kChannelPartStdout || close_main) {
stream_may_close(&chan->stream.stdio.out); wstream_may_close(&chan->stream.stdio.out);
} }
if (part == kChannelPartStderr) { if (part == kChannelPartStderr) {
*error = e_invstream; *error = e_invstream;
@@ -480,9 +480,9 @@ uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader
return 0; return 0;
} }
channel->stream.socket.internal_close_cb = close_cb; channel->stream.socket.s.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel; channel->stream.socket.s.internal_data = channel;
wstream_init(&channel->stream.socket, 0); wstream_init(&channel->stream.socket.s, 0);
rstream_init(&channel->stream.socket, 0); rstream_init(&channel->stream.socket, 0);
if (rpc) { if (rpc) {
@@ -505,9 +505,9 @@ void channel_from_connection(SocketWatcher *watcher)
{ {
Channel *channel = channel_alloc(kChannelStreamSocket); Channel *channel = channel_alloc(kChannelStreamSocket);
socket_watcher_accept(watcher, &channel->stream.socket); socket_watcher_accept(watcher, &channel->stream.socket);
channel->stream.socket.internal_close_cb = close_cb; channel->stream.socket.s.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel; channel->stream.socket.s.internal_data = channel;
wstream_init(&channel->stream.socket, 0); wstream_init(&channel->stream.socket.s, 0);
rstream_init(&channel->stream.socket, 0); rstream_init(&channel->stream.socket, 0);
rpc_start(channel); rpc_start(channel);
channel_create_event(channel, watcher->addr); channel_create_event(channel, watcher->addr);
@@ -647,19 +647,19 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
return l; return l;
} }
void on_channel_data(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) void on_channel_data(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
{ {
Channel *chan = data; Channel *chan = data;
on_channel_output(stream, chan, buf, eof, &chan->on_data); on_channel_output(stream, chan, buf, eof, &chan->on_data);
} }
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) void on_job_stderr(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
{ {
Channel *chan = data; Channel *chan = data;
on_channel_output(stream, chan, buf, eof, &chan->on_stderr); on_channel_output(stream, chan, buf, eof, &chan->on_stderr);
} }
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, bool eof, static void on_channel_output(RStream *stream, Channel *chan, RBuffer *buf, bool eof,
CallbackReader *reader) CallbackReader *reader)
{ {
size_t count; size_t count;
@@ -864,7 +864,7 @@ static void term_resize(uint16_t width, uint16_t height, void *data)
static inline void term_delayed_free(void **argv) static inline void term_delayed_free(void **argv)
{ {
Channel *chan = argv[0]; Channel *chan = argv[0];
if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) { if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) {
multiqueue_put(chan->events, term_delayed_free, chan); multiqueue_put(chan->events, term_delayed_free, chan);
return; return;
} }

View File

@@ -30,7 +30,7 @@ struct Channel {
Process proc; Process proc;
LibuvProcess uv; LibuvProcess uv;
PtyProcess pty; PtyProcess pty;
Stream socket; RStream socket;
StdioPair stdio; StdioPair stdio;
StderrState err; StderrState err;
InternalState internal; InternalState internal;
@@ -73,7 +73,7 @@ static inline Stream *channel_instream(Channel *chan)
return &chan->stream.proc.in; return &chan->stream.proc.in;
case kChannelStreamSocket: case kChannelStreamSocket:
return &chan->stream.socket; return &chan->stream.socket.s;
case kChannelStreamStdio: case kChannelStreamStdio:
return &chan->stream.stdio.out; return &chan->stream.stdio.out;
@@ -85,10 +85,10 @@ static inline Stream *channel_instream(Channel *chan)
abort(); abort();
} }
static inline Stream *channel_outstream(Channel *chan) static inline RStream *channel_outstream(Channel *chan)
REAL_FATTR_NONNULL_ALL; REAL_FATTR_NONNULL_ALL;
static inline Stream *channel_outstream(Channel *chan) static inline RStream *channel_outstream(Channel *chan)
{ {
switch (chan->streamtype) { switch (chan->streamtype) {
case kChannelStreamProc: case kChannelStreamProc:

View File

@@ -30,7 +30,7 @@ typedef enum {
} ChannelStdinMode; } ChannelStdinMode;
typedef struct { typedef struct {
Stream in; RStream in;
Stream out; Stream out;
} StdioPair; } StdioPair;

View File

@@ -55,14 +55,15 @@ struct wbuffer {
}; };
typedef struct stream Stream; typedef struct stream Stream;
/// Type of function called when the Stream buffer is filled with data typedef struct rstream RStream;
/// Type of function called when the RStream buffer is filled with data
/// ///
/// @param stream The Stream instance /// @param stream The Stream instance
/// @param buf The associated RBuffer instance /// @param buf The associated RBuffer instance
/// @param count Number of bytes that was read. /// @param count Number of bytes that was read.
/// @param data User-defined data /// @param data User-defined data
/// @param eof If the stream reached EOF. /// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof); typedef void (*stream_read_cb)(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof);
/// Type of function called when the Stream has information about a write /// Type of function called when the Stream has information about a write
/// request. /// request.
@@ -71,11 +72,11 @@ typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count, void
/// @param data User-defined data /// @param data User-defined data
/// @param status 0 on success, anything else indicates failure /// @param status 0 on success, anything else indicates failure
typedef void (*stream_write_cb)(Stream *stream, void *data, int status); typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
typedef void (*stream_close_cb)(Stream *stream, void *data); typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream { struct stream {
bool closed; bool closed;
bool did_eof;
union { union {
uv_pipe_t pipe; uv_pipe_t pipe;
uv_tcp_t tcp; uv_tcp_t tcp;
@@ -85,20 +86,27 @@ struct stream {
#endif #endif
} uv; } uv;
uv_stream_t *uvstream; uv_stream_t *uvstream;
uv_buf_t uvbuf;
RBuffer *buffer;
uv_file fd; uv_file fd;
stream_read_cb read_cb;
stream_write_cb write_cb;
void *cb_data; void *cb_data;
stream_close_cb close_cb, internal_close_cb; stream_close_cb close_cb, internal_close_cb;
void *close_cb_data, *internal_data; void *close_cb_data, *internal_data;
size_t fpos; size_t pending_reqs;
MultiQueue *events;
// only used for writing:
stream_write_cb write_cb;
size_t curmem; size_t curmem;
size_t maxmem; size_t maxmem;
size_t pending_reqs; };
struct rstream {
Stream s;
bool did_eof;
RBuffer *buffer;
uv_buf_t uvbuf;
stream_read_cb read_cb;
size_t num_bytes; size_t num_bytes;
MultiQueue *events; size_t fpos;
}; };
#define ADDRESS_MAX_SIZE 256 #define ADDRESS_MAX_SIZE 256
@@ -147,7 +155,8 @@ struct process {
char **argv; char **argv;
const char *exepath; const char *exepath;
dict_T *env; dict_T *env;
Stream in, out, err; Stream in;
RStream out, err;
/// Exit handler. If set, user must call process_free(). /// Exit handler. If set, user must call process_free().
process_exit_cb cb; process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb; internal_process_cb internal_exit_cb, internal_close_cb;

View File

@@ -70,19 +70,19 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe); uvproc->uvstdio[0].data.stream = (uv_stream_t *)(&proc->in.uv.pipe);
} }
if (!proc->out.closed) { if (!proc->out.s.closed) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
#ifdef MSWIN #ifdef MSWIN
// pipe must be readable for IOCP to work on Windows. // pipe must be readable for IOCP to work on Windows.
uvproc->uvstdio[1].flags |= proc->overlapped uvproc->uvstdio[1].flags |= proc->overlapped
? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0; ? (UV_READABLE_PIPE | UV_OVERLAPPED_PIPE) : 0;
#endif #endif
uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.uv.pipe); uvproc->uvstdio[1].data.stream = (uv_stream_t *)(&proc->out.s.uv.pipe);
} }
if (!proc->err.closed) { if (!proc->err.s.closed) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.uv.pipe); uvproc->uvstdio[2].data.stream = (uv_stream_t *)(&proc->err.s.uv.pipe);
} else if (proc->fwd_err) { } else if (proc->fwd_err) {
uvproc->uvstdio[2].flags = UV_INHERIT_FD; uvproc->uvstdio[2].flags = UV_INHERIT_FD;
uvproc->uvstdio[2].data.fd = STDERR_FILENO; uvproc->uvstdio[2].data.fd = STDERR_FILENO;

View File

@@ -8,7 +8,9 @@
#include "nvim/event/loop.h" #include "nvim/event/loop.h"
#include "nvim/event/multiqueue.h" #include "nvim/event/multiqueue.h"
#include "nvim/event/process.h" #include "nvim/event/process.h"
#include "nvim/event/rstream.h"
#include "nvim/event/stream.h" #include "nvim/event/stream.h"
#include "nvim/event/wstream.h"
#include "nvim/globals.h" #include "nvim/globals.h"
#include "nvim/log.h" #include "nvim/log.h"
#include "nvim/main.h" #include "nvim/main.h"
@@ -51,15 +53,15 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
} }
if (out) { if (out) {
uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0);
} else { } else {
proc->out.closed = true; proc->out.s.closed = true;
} }
if (err) { if (err) {
uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); uv_pipe_init(&proc->loop->uv, &proc->err.s.uv.pipe, 0);
} else { } else {
proc->err.closed = true; proc->err.s.closed = true;
} }
#ifdef USE_GCOV #ifdef USE_GCOV
@@ -82,10 +84,10 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
} }
if (out) { if (out) {
uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL);
} }
if (err) { if (err) {
uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); uv_close((uv_handle_t *)&proc->err.s.uv.pipe, NULL);
} }
if (proc->type == kProcessTypeUv) { if (proc->type == kProcessTypeUv) {
@@ -106,16 +108,16 @@ int process_spawn(Process *proc, bool in, bool out, bool err)
} }
if (out) { if (out) {
stream_init(NULL, &proc->out, -1, (uv_stream_t *)&proc->out.uv.pipe); stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe);
proc->out.internal_data = proc; proc->out.s.internal_data = proc;
proc->out.internal_close_cb = on_process_stream_close; proc->out.s.internal_close_cb = on_process_stream_close;
proc->refcount++; proc->refcount++;
} }
if (err) { if (err) {
stream_init(NULL, &proc->err, -1, (uv_stream_t *)&proc->err.uv.pipe); stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe);
proc->err.internal_data = proc; proc->err.s.internal_data = proc;
proc->err.internal_close_cb = on_process_stream_close; proc->err.s.internal_close_cb = on_process_stream_close;
proc->refcount++; proc->refcount++;
} }
@@ -148,9 +150,9 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{ {
stream_may_close(&proc->in); wstream_may_close(&proc->in);
stream_may_close(&proc->out); rstream_may_close(&proc->out);
stream_may_close(&proc->err); rstream_may_close(&proc->err);
} }
/// Synchronously wait for a process to finish /// Synchronously wait for a process to finish
@@ -337,10 +339,10 @@ static void process_close(Process *proc)
/// ///
/// @param proc Process, for which an output stream should be flushed. /// @param proc Process, for which an output stream should be flushed.
/// @param stream Stream to flush. /// @param stream Stream to flush.
static void flush_stream(Process *proc, Stream *stream) static void flush_stream(Process *proc, RStream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
{ {
if (!stream || stream->closed) { if (!stream || stream->s.closed) {
return; return;
} }
@@ -350,7 +352,7 @@ static void flush_stream(Process *proc, Stream *stream)
// keeps sending data, we only accept as much data as the system buffer size. // keeps sending data, we only accept as much data as the system buffer size.
// Otherwise this would block cleanup/teardown. // Otherwise this would block cleanup/teardown.
int system_buffer_size = 0; int system_buffer_size = 0;
int err = uv_recv_buffer_size((uv_handle_t *)&stream->uv.pipe, int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe,
&system_buffer_size); &system_buffer_size);
if (err) { if (err) {
system_buffer_size = (int)rbuffer_capacity(stream->buffer); system_buffer_size = (int)rbuffer_capacity(stream->buffer);
@@ -359,14 +361,14 @@ static void flush_stream(Process *proc, Stream *stream)
size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size; size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size;
// Read remaining data. // Read remaining data.
while (!stream->closed && stream->num_bytes < max_bytes) { while (!stream->s.closed && stream->num_bytes < max_bytes) {
// Remember number of bytes before polling // Remember number of bytes before polling
size_t num_bytes = stream->num_bytes; size_t num_bytes = stream->num_bytes;
// Poll for data and process the generated events. // Poll for data and process the generated events.
loop_poll_events(proc->loop, 0); loop_poll_events(proc->loop, 0);
if (stream->events) { if (stream->s.events) {
multiqueue_process_events(stream->events); multiqueue_process_events(stream->s.events);
} }
// Stream can be closed if it is empty. // Stream can be closed if it is empty.
@@ -374,7 +376,7 @@ static void flush_stream(Process *proc, Stream *stream)
if (stream->read_cb && !stream->did_eof) { if (stream->read_cb && !stream->did_eof) {
// Stream callback could miss EOF handling if a child keeps the stream // Stream callback could miss EOF handling if a child keeps the stream
// open. But only send EOF if we haven't already. // open. But only send EOF if we haven't already.
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true); stream->read_cb(stream, stream->buffer, 0, stream->s.cb_data, true);
} }
break; break;
} }

View File

@@ -21,8 +21,8 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.argv = NULL, .argv = NULL,
.exepath = NULL, .exepath = NULL,
.in = { .closed = false }, .in = { .closed = false },
.out = { .closed = false }, .out = { .s.closed = false },
.err = { .closed = false }, .err = { .s.closed = false },
.cb = NULL, .cb = NULL,
.closed = false, .closed = false,
.internal_close_cb = NULL, .internal_close_cb = NULL,

View File

@@ -19,23 +19,26 @@
# include "event/rstream.c.generated.h" # include "event/rstream.c.generated.h"
#endif #endif
void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize) void rstream_init_fd(Loop *loop, RStream *stream, int fd, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1, 2) FUNC_ATTR_NONNULL_ARG(1, 2)
{ {
stream_init(loop, stream, fd, NULL); stream_init(loop, &stream->s, fd, NULL);
rstream_init(stream, bufsize); rstream_init(stream, bufsize);
} }
void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize) void rstream_init_stream(RStream *stream, uv_stream_t *uvstream, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1, 2) FUNC_ATTR_NONNULL_ARG(1, 2)
{ {
stream_init(NULL, stream, -1, uvstream); stream_init(NULL, &stream->s, -1, uvstream);
rstream_init(stream, bufsize); rstream_init(stream, bufsize);
} }
void rstream_init(Stream *stream, size_t bufsize) void rstream_init(RStream *stream, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
{ {
stream->fpos = 0;
stream->read_cb = NULL;
stream->num_bytes = 0;
stream->buffer = rbuffer_new(bufsize); stream->buffer = rbuffer_new(bufsize);
stream->buffer->data = stream; stream->buffer->data = stream;
stream->buffer->full_cb = on_rbuffer_full; stream->buffer->full_cb = on_rbuffer_full;
@@ -45,28 +48,28 @@ void rstream_init(Stream *stream, size_t bufsize)
/// Starts watching for events from a `Stream` instance. /// Starts watching for events from a `Stream` instance.
/// ///
/// @param stream The `Stream` instance /// @param stream The `Stream` instance
void rstream_start(Stream *stream, stream_read_cb cb, void *data) void rstream_start(RStream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
{ {
stream->read_cb = cb; stream->read_cb = cb;
stream->cb_data = data; stream->s.cb_data = data;
if (stream->uvstream) { if (stream->s.uvstream) {
uv_read_start(stream->uvstream, alloc_cb, read_cb); uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
} else { } else {
uv_idle_start(&stream->uv.idle, fread_idle_cb); uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
} }
} }
/// Stops watching for events from a `Stream` instance. /// Stops watching for events from a `Stream` instance.
/// ///
/// @param stream The `Stream` instance /// @param stream The `Stream` instance
void rstream_stop(Stream *stream) void rstream_stop(RStream *stream)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_NONNULL_ALL
{ {
if (stream->uvstream) { if (stream->s.uvstream) {
uv_read_stop(stream->uvstream); uv_read_stop(stream->s.uvstream);
} else { } else {
uv_idle_stop(&stream->uv.idle); uv_idle_stop(&stream->s.uv.idle);
} }
} }
@@ -77,9 +80,9 @@ static void on_rbuffer_full(RBuffer *buf, void *data)
static void on_rbuffer_nonfull(RBuffer *buf, void *data) static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{ {
Stream *stream = data; RStream *stream = data;
assert(stream->read_cb); assert(stream->read_cb);
rstream_start(stream, stream->read_cb, stream->cb_data); rstream_start(stream, stream->read_cb, stream->s.cb_data);
} }
// Callbacks used by libuv // Callbacks used by libuv
@@ -87,7 +90,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
/// Called by libuv to allocate memory for reading. /// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{ {
Stream *stream = handle->data; RStream *stream = handle->data;
// `uv_buf_t.len` happens to have different size on Windows. // `uv_buf_t.len` happens to have different size on Windows.
size_t write_count; size_t write_count;
buf->base = rbuffer_write_ptr(stream->buffer, &write_count); buf->base = rbuffer_write_ptr(stream->buffer, &write_count);
@@ -99,7 +102,7 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
/// 0-length buffer. /// 0-length buffer.
static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{ {
Stream *stream = uvstream->data; RStream *stream = uvstream->data;
if (cnt <= 0) { if (cnt <= 0) {
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed: // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
@@ -141,7 +144,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
static void fread_idle_cb(uv_idle_t *handle) static void fread_idle_cb(uv_idle_t *handle)
{ {
uv_fs_t req; uv_fs_t req;
Stream *stream = handle->data; RStream *stream = handle->data;
// `uv_buf_t.len` happens to have different size on Windows. // `uv_buf_t.len` happens to have different size on Windows.
size_t write_count; size_t write_count;
@@ -160,7 +163,7 @@ static void fread_idle_cb(uv_idle_t *handle)
// Synchronous read // Synchronous read
uv_fs_read(handle->loop, uv_fs_read(handle->loop,
&req, &req,
stream->fd, stream->s.fd,
&stream->uvbuf, &stream->uvbuf,
1, 1,
(int64_t)stream->fpos, (int64_t)stream->fpos,
@@ -169,7 +172,7 @@ static void fread_idle_cb(uv_idle_t *handle)
uv_fs_req_cleanup(&req); uv_fs_req_cleanup(&req);
if (req.result <= 0) { if (req.result <= 0) {
uv_idle_stop(&stream->uv.idle); uv_idle_stop(&stream->s.uv.idle);
invoke_read_cb(stream, 0, true); invoke_read_cb(stream, 0, true);
return; return;
} }
@@ -183,24 +186,29 @@ static void fread_idle_cb(uv_idle_t *handle)
static void read_event(void **argv) static void read_event(void **argv)
{ {
Stream *stream = argv[0]; RStream *stream = argv[0];
if (stream->read_cb) { if (stream->read_cb) {
size_t count = (uintptr_t)argv[1]; size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2]; bool eof = (uintptr_t)argv[2];
stream->did_eof = eof; stream->did_eof = eof;
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof); stream->read_cb(stream, stream->buffer, count, stream->s.cb_data, eof);
} }
stream->pending_reqs--; stream->s.pending_reqs--;
if (stream->closed && !stream->pending_reqs) { if (stream->s.closed && !stream->s.pending_reqs) {
stream_close_handle(stream); stream_close_handle(&stream->s, true);
} }
} }
static void invoke_read_cb(Stream *stream, size_t count, bool eof) static void invoke_read_cb(RStream *stream, size_t count, bool eof)
{ {
// Don't let the stream be closed before the event is processed. // Don't let the stream be closed before the event is processed.
stream->pending_reqs++; stream->s.pending_reqs++;
CREATE_EVENT(stream->events, read_event, CREATE_EVENT(stream->s.events, read_event,
stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof); stream, (void *)(uintptr_t *)count, (void *)(uintptr_t)eof);
} }
void rstream_may_close(RStream *stream)
{
stream_may_close(&stream->s, true);
}

View File

@@ -135,17 +135,17 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0; return 0;
} }
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream) int socket_watcher_accept(SocketWatcher *watcher, RStream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{ {
uv_stream_t *client; uv_stream_t *client;
if (watcher->stream->type == UV_TCP) { if (watcher->stream->type == UV_TCP) {
client = (uv_stream_t *)(&stream->uv.tcp); client = (uv_stream_t *)(&stream->s.uv.tcp);
uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client); uv_tcp_init(watcher->uv.tcp.handle.loop, (uv_tcp_t *)client);
uv_tcp_nodelay((uv_tcp_t *)client, true); uv_tcp_nodelay((uv_tcp_t *)client, true);
} else { } else {
client = (uv_stream_t *)&stream->uv.pipe; client = (uv_stream_t *)&stream->s.uv.pipe;
uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0); uv_pipe_init(watcher->uv.pipe.handle.loop, (uv_pipe_t *)client, 0);
} }
@@ -156,7 +156,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
return result; return result;
} }
stream_init(NULL, stream, -1, client); stream_init(NULL, &stream->s, -1, client);
return 0; return 0;
} }
@@ -197,7 +197,7 @@ static void connect_cb(uv_connect_t *req, int status)
} }
} }
bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address, int timeout, bool socket_connect(Loop *loop, RStream *stream, bool is_tcp, const char *address, int timeout,
const char **error) const char **error)
{ {
bool success = false; bool success = false;
@@ -206,7 +206,7 @@ bool socket_connect(Loop *loop, Stream *stream, bool is_tcp, const char *address
req.data = &status; req.data = &status;
uv_stream_t *uv_stream; uv_stream_t *uv_stream;
uv_tcp_t *tcp = &stream->uv.tcp; uv_tcp_t *tcp = &stream->s.uv.tcp;
uv_getaddrinfo_t addr_req; uv_getaddrinfo_t addr_req;
addr_req.addrinfo = NULL; addr_req.addrinfo = NULL;
const struct addrinfo *addrinfo = NULL; const struct addrinfo *addrinfo = NULL;
@@ -237,7 +237,7 @@ tcp_retry:
uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb); uv_tcp_connect(&req, tcp, addrinfo->ai_addr, connect_cb);
uv_stream = (uv_stream_t *)tcp; uv_stream = (uv_stream_t *)tcp;
} else { } else {
uv_pipe_t *pipe = &stream->uv.pipe; uv_pipe_t *pipe = &stream->s.uv.pipe;
uv_pipe_init(&loop->uv, pipe, 0); uv_pipe_init(&loop->uv, pipe, 0);
uv_pipe_connect(&req, pipe, address, connect_cb); uv_pipe_connect(&req, pipe, address, connect_cb);
uv_stream = (uv_stream_t *)pipe; uv_stream = (uv_stream_t *)pipe;
@@ -245,7 +245,7 @@ tcp_retry:
status = 1; status = 1;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1); LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1);
if (status == 0) { if (status == 0) {
stream_init(NULL, stream, -1, uv_stream); stream_init(NULL, &stream->s, -1, uv_stream);
success = true; success = true;
} else if (is_tcp && addrinfo->ai_next) { } else if (is_tcp && addrinfo->ai_next) {
addrinfo = addrinfo->ai_next; addrinfo = addrinfo->ai_next;

View File

@@ -85,21 +85,17 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
} }
stream->internal_data = NULL; stream->internal_data = NULL;
stream->fpos = 0;
stream->curmem = 0; stream->curmem = 0;
stream->maxmem = 0; stream->maxmem = 0;
stream->pending_reqs = 0; stream->pending_reqs = 0;
stream->read_cb = NULL;
stream->write_cb = NULL; stream->write_cb = NULL;
stream->close_cb = NULL; stream->close_cb = NULL;
stream->internal_close_cb = NULL; stream->internal_close_cb = NULL;
stream->closed = false; stream->closed = false;
stream->buffer = NULL;
stream->events = NULL; stream->events = NULL;
stream->num_bytes = 0;
} }
void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data) void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data, bool rstream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
{ {
assert(!stream->closed); assert(!stream->closed);
@@ -116,18 +112,18 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
#endif #endif
if (!stream->pending_reqs) { if (!stream->pending_reqs) {
stream_close_handle(stream); stream_close_handle(stream, rstream);
} }
} }
void stream_may_close(Stream *stream) void stream_may_close(Stream *stream, bool rstream)
{ {
if (!stream->closed) { if (!stream->closed) {
stream_close(stream, NULL, NULL); stream_close(stream, NULL, NULL, rstream);
} }
} }
void stream_close_handle(Stream *stream) void stream_close_handle(Stream *stream, bool rstream)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_NONNULL_ALL
{ {
uv_handle_t *handle = NULL; uv_handle_t *handle = NULL;
@@ -145,16 +141,22 @@ void stream_close_handle(Stream *stream)
assert(handle != NULL); assert(handle != NULL);
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
uv_close(handle, close_cb); uv_close(handle, rstream ? rstream_close_cb : close_cb);
} }
} }
static void rstream_close_cb(uv_handle_t *handle)
{
RStream *stream = handle->data;
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
close_cb(handle);
}
static void close_cb(uv_handle_t *handle) static void close_cb(uv_handle_t *handle)
{ {
Stream *stream = handle->data; Stream *stream = handle->data;
if (stream->buffer) {
rbuffer_free(stream->buffer);
}
if (stream->close_cb) { if (stream->close_cb) {
stream->close_cb(stream, stream->close_cb_data); stream->close_cb(stream, stream->close_cb_data);
} }

View File

@@ -141,7 +141,7 @@ static void write_cb(uv_write_t *req, int status)
if (data->stream->closed && data->stream->pending_reqs == 0) { if (data->stream->closed && data->stream->pending_reqs == 0) {
// Last pending write, free the stream; // Last pending write, free the stream;
stream_close_handle(data->stream); stream_close_handle(data->stream, false);
} }
xfree(data); xfree(data);
@@ -158,3 +158,8 @@ void wstream_release_wbuffer(WBuffer *buffer)
xfree(buffer); xfree(buffer);
} }
} }
void wstream_may_close(Stream *stream)
{
stream_may_close(stream, false);
}

View File

@@ -89,7 +89,7 @@ void rpc_start(Channel *channel)
kv_init(rpc->call_stack); kv_init(rpc->call_stack);
if (channel->streamtype != kChannelStreamInternal) { if (channel->streamtype != kChannelStreamInternal) {
Stream *out = channel_outstream(channel); RStream *out = channel_outstream(channel);
#ifdef NVIM_LOG_DEBUG #ifdef NVIM_LOG_DEBUG
Stream *in = channel_instream(channel); Stream *in = channel_instream(channel);
DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
@@ -202,7 +202,7 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
return frame.errored ? NIL : frame.result; return frame.errored ? NIL : frame.result;
} }
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof) static void receive_msgpack(RStream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
{ {
Channel *channel = data; Channel *channel = data;
channel_incref(channel); channel_incref(channel);

View File

@@ -41,7 +41,7 @@ typedef enum {
kInputEof, kInputEof,
} InbufPollResult; } InbufPollResult;
static Stream read_stream = { .closed = true }; // Input before UI starts. static RStream read_stream = { .s.closed = true }; // Input before UI starts.
static RBuffer *input_buffer = NULL; static RBuffer *input_buffer = NULL;
static bool input_eof = false; static bool input_eof = false;
static bool blocking = false; static bool blocking = false;
@@ -59,7 +59,7 @@ void input_init(void)
void input_start(void) void input_start(void)
{ {
if (!read_stream.closed) { if (!read_stream.s.closed) {
return; return;
} }
@@ -70,12 +70,12 @@ void input_start(void)
void input_stop(void) void input_stop(void)
{ {
if (read_stream.closed) { if (read_stream.s.closed) {
return; return;
} }
rstream_stop(&read_stream); rstream_stop(&read_stream);
stream_close(&read_stream, NULL, NULL); rstream_may_close(&read_stream);
} }
#ifdef EXITFREE #ifdef EXITFREE
@@ -138,7 +138,7 @@ int os_inchar(uint8_t *buf, int maxlen, int ms, int tb_change_cnt, MultiQueue *e
uint64_t wait_start = os_hrtime(); uint64_t wait_start = os_hrtime();
cursorhold_time = MIN(cursorhold_time, (int)p_ut); cursorhold_time = MIN(cursorhold_time, (int)p_ut);
if ((result = inbuf_poll((int)p_ut - cursorhold_time, events)) == kInputNone) { if ((result = inbuf_poll((int)p_ut - cursorhold_time, events)) == kInputNone) {
if (read_stream.closed && silent_mode) { if (read_stream.s.closed && silent_mode) {
// Drained eventloop & initial input; exit silent/batch-mode (-es/-Es). // Drained eventloop & initial input; exit silent/batch-mode (-es/-Es).
read_error_exit(); read_error_exit();
} }
@@ -489,7 +489,7 @@ bool input_available(void)
return rbuffer_size(input_buffer) != 0; return rbuffer_size(input_buffer) != 0;
} }
static void input_read_cb(Stream *stream, RBuffer *buf, size_t c, void *data, bool at_eof) static void input_read_cb(RStream *stream, RBuffer *buf, size_t c, void *data, bool at_eof)
{ {
if (at_eof) { if (at_eof) {
input_eof = true; input_eof = true;

View File

@@ -169,7 +169,7 @@ int pty_process_spawn(PtyProcess *ptyproc)
int status = 0; // zero or negative error code (libuv convention) int status = 0; // zero or negative error code (libuv convention)
Process *proc = (Process *)ptyproc; Process *proc = (Process *)ptyproc;
assert(proc->err.closed); assert(proc->err.s.closed);
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 }; ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 };
uv_disable_stdio_inheritance(); uv_disable_stdio_inheritance();
@@ -208,8 +208,8 @@ int pty_process_spawn(PtyProcess *ptyproc)
&& (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) { && (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) {
goto error; goto error;
} }
if (!proc->out.closed if (!proc->out.s.closed
&& (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) { && (status = set_duplicating_descriptor(master, &proc->out.s.uv.pipe))) {
goto error; goto error;
} }

View File

@@ -55,7 +55,7 @@ int pty_process_spawn(PtyProcess *ptyproc)
wchar_t *env = NULL; wchar_t *env = NULL;
const char *emsg = NULL; const char *emsg = NULL;
assert(proc->err.closed); assert(proc->err.s.closed);
if (!os_has_conpty_working() || (conpty_object = os_conpty_init(&in_name, if (!os_has_conpty_working() || (conpty_object = os_conpty_init(&in_name,
&out_name, ptyproc->width, &out_name, ptyproc->width,
@@ -72,10 +72,10 @@ int pty_process_spawn(PtyProcess *ptyproc)
pty_process_connect_cb); pty_process_connect_cb);
} }
if (!proc->out.closed) { if (!proc->out.s.closed) {
out_req = xmalloc(sizeof(uv_connect_t)); out_req = xmalloc(sizeof(uv_connect_t));
uv_pipe_connect(out_req, uv_pipe_connect(out_req,
&proc->out.uv.pipe, &proc->out.s.uv.pipe,
out_name, out_name,
pty_process_connect_cb); pty_process_connect_cb);
} }
@@ -216,7 +216,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer)
Process *proc = (Process *)ptyproc; Process *proc = (Process *)ptyproc;
assert(ptyproc->finish_wait != NULL); assert(ptyproc->finish_wait != NULL);
if (proc->out.closed || proc->out.did_eof || !uv_is_readable(proc->out.uvstream)) { if (proc->out.s.closed || proc->out.did_eof || !uv_is_readable(proc->out.s.uvstream)) {
uv_timer_stop(&ptyproc->wait_eof_timer); uv_timer_stop(&ptyproc->wait_eof_timer);
pty_process_finish2(ptyproc); pty_process_finish2(ptyproc);
} }

View File

@@ -987,7 +987,7 @@ static void dynamic_buffer_ensure(DynamicBuffer *buf, size_t desired)
buf->data = xrealloc(buf->data, buf->cap); buf->data = xrealloc(buf->data, buf->cap);
} }
static void system_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) static void system_data_cb(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
{ {
DynamicBuffer *dbuf = data; DynamicBuffer *dbuf = data;
@@ -1151,7 +1151,7 @@ end:
ui_flush(); ui_flush();
} }
static void out_data_cb(Stream *stream, RBuffer *buf, size_t count, void *data, bool eof) static void out_data_cb(RStream *stream, RBuffer *buf, size_t count, void *data, bool eof)
{ {
size_t cnt; size_t cnt;
char *ptr = rbuffer_read_ptr(buf, &cnt); char *ptr = rbuffer_read_ptr(buf, &cnt);
@@ -1331,7 +1331,7 @@ static void shell_write_cb(Stream *stream, void *data, int status)
msg_schedule_semsg(_("E5677: Error writing input to shell-command: %s"), msg_schedule_semsg(_("E5677: Error writing input to shell-command: %s"),
uv_err_name(status)); uv_err_name(status));
} }
stream_close(stream, NULL, NULL); stream_close(stream, NULL, NULL, false);
} }
/// Applies 'shellxescape' (p_sxe) and 'shellxquote' (p_sxq) to a command. /// Applies 'shellxescape' (p_sxe) and 'shellxquote' (p_sxq) to a command.

View File

@@ -167,7 +167,7 @@ void tinput_destroy(TermInput *input)
map_destroy(int, &kitty_key_map); map_destroy(int, &kitty_key_map);
rbuffer_free(input->key_buffer); rbuffer_free(input->key_buffer);
uv_close((uv_handle_t *)&input->timer_handle, NULL); uv_close((uv_handle_t *)&input->timer_handle, NULL);
stream_close(&input->read_stream, NULL, NULL); rstream_may_close(&input->read_stream);
termkey_destroy(input->tk); termkey_destroy(input->tk);
} }
@@ -737,7 +737,7 @@ static void handle_raw_buffer(TermInput *input, bool force)
} }
} }
static void tinput_read_cb(Stream *stream, RBuffer *buf, size_t count_, void *data, bool eof) static void tinput_read_cb(RStream *stream, RBuffer *buf, size_t count_, void *data, bool eof)
{ {
TermInput *input = data; TermInput *input = data;

View File

@@ -33,7 +33,7 @@ typedef struct {
TermKey_Terminfo_Getstr_Hook *tk_ti_hook_fn; ///< libtermkey terminfo hook TermKey_Terminfo_Getstr_Hook *tk_ti_hook_fn; ///< libtermkey terminfo hook
uv_timer_t timer_handle; uv_timer_t timer_handle;
Loop *loop; Loop *loop;
Stream read_stream; RStream read_stream;
RBuffer *key_buffer; RBuffer *key_buffer;
TUIData *tui_data; TUIData *tui_data;
} TermInput; } TermInput;