stream: set data together with callback

This commit is contained in:
Björn Linse
2016-05-12 13:18:04 +02:00
parent 1b825a9ada
commit 215922120c
11 changed files with 54 additions and 59 deletions

View File

@@ -22178,11 +22178,11 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
wstream_init(proc->in, 0); wstream_init(proc->in, 0);
if (proc->out) { if (proc->out) {
rstream_init(proc->out, 0); rstream_init(proc->out, 0);
rstream_start(proc->out, on_job_stdout); rstream_start(proc->out, on_job_stdout, data);
} }
if (proc->err) { if (proc->err) {
rstream_init(proc->err, 0); rstream_init(proc->err, 0);
rstream_start(proc->err, on_job_stderr); rstream_start(proc->err, on_job_stderr, data);
} }
pmap_put(uint64_t)(jobs, data->id, data); pmap_put(uint64_t)(jobs, data->id, data);
rettv->vval.v_number = data->id; rettv->vval.v_number = data->id;

View File

@@ -25,7 +25,7 @@
#define CLOSE_PROC_STREAM(proc, stream) \ #define CLOSE_PROC_STREAM(proc, stream) \
do { \ do { \
if (proc->stream && !proc->stream->closed) { \ if (proc->stream && !proc->stream->closed) { \
stream_close(proc->stream, NULL); \ stream_close(proc->stream, NULL, NULL); \
} \ } \
} while (0) } while (0)
@@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return false; return false;
} }
void *data = proc->data;
if (proc->in) { if (proc->in) {
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data); stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
proc->in->events = proc->events; proc->in->events = proc->events;
proc->in->internal_data = proc; proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close; proc->in->internal_close_cb = on_process_stream_close;
@@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
} }
if (proc->out) { if (proc->out) {
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data); stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
proc->out->events = proc->events; proc->out->events = proc->events;
proc->out->internal_data = proc; proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close; proc->out->internal_close_cb = on_process_stream_close;
@@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
} }
if (proc->err) { if (proc->err) {
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data); stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
proc->err->events = proc->events; proc->err->events = proc->events;
proc->err->internal_data = proc; proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close; proc->err->internal_close_cb = on_process_stream_close;
@@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream)
if (stream->read_cb) { if (stream->read_cb) {
// Stream callback could miss EOF handling if a child keeps the stream // Stream callback could miss EOF handling if a child keeps the stream
// open. // open.
stream->read_cb(stream, stream->buffer, 0, stream->data, true); stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
} }
break; break;
} }

View File

@@ -17,21 +17,19 @@
# include "event/rstream.c.generated.h" # include "event/rstream.c.generated.h"
#endif #endif
void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize, void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(2)
{ {
stream_init(loop, stream, fd, NULL, data); stream_init(loop, stream, fd, NULL);
rstream_init(stream, bufsize); rstream_init(stream, bufsize);
} }
void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize, void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(2)
{ {
stream_init(NULL, stream, -1, uvstream, data); stream_init(NULL, stream, -1, uvstream);
rstream_init(stream, bufsize); rstream_init(stream, bufsize);
} }
@@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize)
/// Starts watching for events from a `Stream` instance. /// Starts watching for events from a `Stream` instance.
/// ///
/// @param stream The `Stream` instance /// @param stream The `Stream` instance
void rstream_start(Stream *stream, stream_read_cb cb) void rstream_start(Stream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
{ {
stream->read_cb = cb; stream->read_cb = cb;
stream->cb_data = data;
if (stream->uvstream) { if (stream->uvstream) {
uv_read_start(stream->uvstream, alloc_cb, read_cb); uv_read_start(stream->uvstream, alloc_cb, read_cb);
} else { } else {
@@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{ {
Stream *stream = data; Stream *stream = data;
assert(stream->read_cb); assert(stream->read_cb);
rstream_start(stream, stream->read_cb); rstream_start(stream, stream->read_cb, stream->cb_data);
} }
// Callbacks used by libuv // Callbacks used by libuv
@@ -179,7 +178,7 @@ static void read_event(void **argv)
if (stream->read_cb) { if (stream->read_cb) {
size_t count = (uintptr_t)argv[1]; size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2]; bool eof = (uintptr_t)argv[2];
stream->read_cb(stream, stream->buffer, count, stream->data, eof); stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
} }
stream->pending_reqs--; stream->pending_reqs--;
if (stream->closed && !stream->pending_reqs) { if (stream->closed && !stream->pending_reqs) {

View File

@@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0; return 0;
} }
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data) int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{ {
uv_stream_t *client; uv_stream_t *client;
@@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
return result; return result;
} }
stream_init(NULL, stream, -1, client, data); stream_init(NULL, stream, -1, client);
return 0; return 0;
} }

View File

@@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking)
return retval; return retval;
} }
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream, void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
void *data)
FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(2)
{ {
stream->uvstream = uvstream; stream->uvstream = uvstream;
@@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->uvstream->data = stream; stream->uvstream->data = stream;
} }
stream->data = data;
stream->internal_data = NULL; stream->internal_data = NULL;
stream->fpos = 0; stream->fpos = 0;
stream->curmem = 0; stream->curmem = 0;
@@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->num_bytes = 0; stream->num_bytes = 0;
} }
void stream_close(Stream *stream, stream_close_cb on_stream_close) void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(1)
{ {
assert(!stream->closed); assert(!stream->closed);
stream->closed = true; stream->closed = true;
stream->close_cb = on_stream_close; stream->close_cb = on_stream_close;
stream->close_cb_data = data;
if (!stream->pending_reqs) { if (!stream->pending_reqs) {
stream_close_handle(stream); stream_close_handle(stream);
@@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle)
rbuffer_free(stream->buffer); rbuffer_free(stream->buffer);
} }
if (stream->close_cb) { if (stream->close_cb) {
stream->close_cb(stream, stream->data); stream->close_cb(stream, stream->close_cb_data);
} }
if (stream->internal_close_cb) { if (stream->internal_close_cb) {
stream->internal_close_cb(stream, stream->internal_data); stream->internal_close_cb(stream, stream->internal_data);

View File

@@ -44,13 +44,14 @@ struct stream {
uv_file fd; uv_file fd;
stream_read_cb read_cb; stream_read_cb read_cb;
stream_write_cb write_cb; stream_write_cb write_cb;
void *cb_data;
stream_close_cb close_cb, internal_close_cb; stream_close_cb close_cb, internal_close_cb;
void *close_cb_data, *internal_data;
size_t fpos; size_t fpos;
size_t curmem; size_t curmem;
size_t maxmem; size_t maxmem;
size_t pending_reqs; size_t pending_reqs;
size_t num_bytes; size_t num_bytes;
void *data, *internal_data;
bool closed; bool closed;
Queue *events; Queue *events;
}; };

View File

@@ -22,19 +22,17 @@ typedef struct {
# include "event/wstream.c.generated.h" # include "event/wstream.c.generated.h"
#endif #endif
void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem, void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{ {
stream_init(loop, stream, fd, NULL, data); stream_init(loop, stream, fd, NULL);
wstream_init(stream, maxmem); wstream_init(stream, maxmem);
} }
void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem, void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
void *data)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{ {
stream_init(NULL, stream, -1, uvstream, data); stream_init(NULL, stream, -1, uvstream);
wstream_init(stream, maxmem); wstream_init(stream, maxmem);
} }
@@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem)
/// ///
/// @param stream The `Stream` instance /// @param stream The `Stream` instance
/// @param cb The callback /// @param cb The callback
void wstream_set_write_cb(Stream *stream, stream_write_cb cb) void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_NONNULL_ARG(1, 2)
{ {
stream->write_cb = cb; stream->write_cb = cb;
stream->cb_data = data;
} }
/// Queues data for writing to the backing file descriptor of a `Stream` /// Queues data for writing to the backing file descriptor of a `Stream`
@@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status)
wstream_release_wbuffer(data->buffer); wstream_release_wbuffer(data->buffer);
if (data->stream->write_cb) { if (data->stream->write_cb) {
data->stream->write_cb(data->stream, data->stream->data, status); data->stream->write_cb(data->stream, data->stream->cb_data, status);
} }
data->stream->pending_reqs--; data->stream->pending_reqs--;

View File

@@ -136,9 +136,9 @@ uint64_t channel_from_process(char **argv)
incref(channel); // process channels are only closed by the exit_cb incref(channel); // process channels are only closed by the exit_cb
wstream_init(proc->in, 0); wstream_init(proc->in, 0);
rstream_init(proc->out, 0); rstream_init(proc->out, 0);
rstream_start(proc->out, parse_msgpack); rstream_start(proc->out, parse_msgpack, channel);
rstream_init(proc->err, 0); rstream_init(proc->err, 0);
rstream_start(proc->err, forward_stderr); rstream_start(proc->err, forward_stderr, channel);
return channel->id; return channel->id;
} }
@@ -149,13 +149,13 @@ uint64_t channel_from_process(char **argv)
void channel_from_connection(SocketWatcher *watcher) void channel_from_connection(SocketWatcher *watcher)
{ {
Channel *channel = register_channel(kChannelTypeSocket); Channel *channel = register_channel(kChannelTypeSocket);
socket_watcher_accept(watcher, &channel->data.stream, channel); socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb; channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel; channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0); wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, parse_msgpack); rstream_start(&channel->data.stream, parse_msgpack, channel);
} }
/// Sends event/arguments to channel /// Sends event/arguments to channel
@@ -317,11 +317,10 @@ void channel_from_stdio(void)
Channel *channel = register_channel(kChannelTypeStdio); Channel *channel = register_channel(kChannelTypeStdio);
incref(channel); // stdio channels are only closed on exit incref(channel); // stdio channels are only closed on exit
// read stream // read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE, rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
channel); rstream_start(&channel->data.std.in, parse_msgpack, channel);
rstream_start(&channel->data.std.in, parse_msgpack);
// write stream // write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL); wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
} }
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count, static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
@@ -637,7 +636,7 @@ static void close_channel(Channel *channel)
switch (channel->type) { switch (channel->type) {
case kChannelTypeSocket: case kChannelTypeSocket:
stream_close(&channel->data.stream, NULL); stream_close(&channel->data.stream, NULL, NULL);
break; break;
case kChannelTypeProc: case kChannelTypeProc:
if (!channel->data.process.uvproc.process.closed) { if (!channel->data.process.uvproc.process.closed) {
@@ -645,8 +644,8 @@ static void close_channel(Channel *channel)
} }
break; break;
case kChannelTypeStdio: case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL); stream_close(&channel->data.std.in, NULL, NULL);
stream_close(&channel->data.std.out, NULL); stream_close(&channel->data.std.out, NULL, NULL);
queue_put(main_loop.fast_events, exit_event, 1, channel); queue_put(main_loop.fast_events, exit_event, 1, channel);
return; return;
default: default:

View File

@@ -60,8 +60,8 @@ void input_start(int fd)
} }
global_fd = fd; global_fd = fd;
rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL); rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE);
rstream_start(&read_stream, read_cb); rstream_start(&read_stream, read_cb, NULL);
} }
void input_stop(void) void input_stop(void)
@@ -71,7 +71,7 @@ void input_stop(void)
} }
rstream_stop(&read_stream); rstream_stop(&read_stream);
stream_close(&read_stream, NULL); stream_close(&read_stream, NULL, NULL);
} }
static void cursorhold_event(void **argv) static void cursorhold_event(void **argv)

View File

@@ -236,10 +236,10 @@ static int do_os_system(char **argv,
} }
proc->out->events = NULL; proc->out->events = NULL;
rstream_init(proc->out, 0); rstream_init(proc->out, 0);
rstream_start(proc->out, data_cb); rstream_start(proc->out, data_cb, &buf);
proc->err->events = NULL; proc->err->events = NULL;
rstream_init(proc->err, 0); rstream_init(proc->err, 0);
rstream_start(proc->err, data_cb); rstream_start(proc->err, data_cb, &buf);
// write the input, if any // write the input, if any
if (input) { if (input) {
@@ -251,7 +251,7 @@ static int do_os_system(char **argv,
return -1; return -1;
} }
// close the input stream after everything is written // close the input stream after everything is written
wstream_set_write_cb(&in, shell_write_cb); wstream_set_write_cb(&in, shell_write_cb, NULL);
} }
// invoke busy_start here so event_poll_until wont change the busy state for // invoke busy_start here so event_poll_until wont change the busy state for
@@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
static void shell_write_cb(Stream *stream, void *data, int status) static void shell_write_cb(Stream *stream, void *data, int status)
{ {
stream_close(stream, NULL); stream_close(stream, NULL, NULL);
} }

View File

@@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop)
int curflags = termkey_get_canonflags(input->tk); int curflags = termkey_get_canonflags(input->tk);
termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS); termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS);
// setup input handle // setup input handle
rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input); rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff);
// initialize a timer handle for handling ESC with libtermkey // initialize a timer handle for handling ESC with libtermkey
time_watcher_init(loop, &input->timer_handle, input); time_watcher_init(loop, &input->timer_handle, input);
} }
@@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input)
uv_mutex_destroy(&input->key_buffer_mutex); uv_mutex_destroy(&input->key_buffer_mutex);
uv_cond_destroy(&input->key_buffer_cond); uv_cond_destroy(&input->key_buffer_cond);
time_watcher_close(&input->timer_handle, NULL); time_watcher_close(&input->timer_handle, NULL);
stream_close(&input->read_stream, NULL); stream_close(&input->read_stream, NULL, NULL);
termkey_destroy(input->tk); termkey_destroy(input->tk);
} }
void term_input_start(TermInput *input) void term_input_start(TermInput *input)
{ {
rstream_start(&input->read_stream, read_cb); rstream_start(&input->read_stream, read_cb, input);
} }
void term_input_stop(TermInput *input) void term_input_stop(TermInput *input)
@@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
// //
// ls *.md | xargs nvim // ls *.md | xargs nvim
input->in_fd = 2; input->in_fd = 2;
stream_close(&input->read_stream, NULL); stream_close(&input->read_stream, NULL, NULL);
queue_put(input->loop->fast_events, restart_reading, 1, input); queue_put(input->loop->fast_events, restart_reading, 1, input);
} else { } else {
loop_schedule(&main_loop, event_create(1, input_done_event, 0)); loop_schedule(&main_loop, event_create(1, input_done_event, 0));
@@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
static void restart_reading(void **argv) static void restart_reading(void **argv)
{ {
TermInput *input = argv[0]; TermInput *input = argv[0];
rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input); rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff);
rstream_start(&input->read_stream, read_cb); rstream_start(&input->read_stream, read_cb, input);
} }