mirror of
https://github.com/neovim/neovim.git
synced 2025-09-15 07:48:18 +00:00
rstream: Extract some RStream functionality to RBuffer
RBuffer instances represent the internal buffer used by RStreams. This changes RStream constructor to receive RBuffer pointers and adds a set of RBuffer methods that expose the lower level buffer manipulation to consumers of the RStream API.
This commit is contained in:
@@ -19531,7 +19531,7 @@ static void on_job_exit(Job *job, void *data)
|
||||
static void on_job_data(RStream *rstream, void *data, bool eof, char *type)
|
||||
{
|
||||
Job *job = data;
|
||||
uint32_t read_count = rstream_available(rstream);
|
||||
uint32_t read_count = rstream_pending(rstream);
|
||||
char *str = xmalloc(read_count + 1);
|
||||
|
||||
rstream_read(rstream, str, read_count);
|
||||
|
@@ -127,7 +127,7 @@ void channel_from_stream(uv_stream_t *stream)
|
||||
channel->is_job = false;
|
||||
// read stream
|
||||
channel->data.streams.read = rstream_new(parse_msgpack,
|
||||
CHANNEL_BUFFER_SIZE,
|
||||
rbuffer_new(CHANNEL_BUFFER_SIZE),
|
||||
channel,
|
||||
NULL);
|
||||
rstream_set_stream(channel->data.streams.read, stream);
|
||||
@@ -290,7 +290,7 @@ static void channel_from_stdio(void)
|
||||
channel->is_job = false;
|
||||
// read stream
|
||||
channel->data.streams.read = rstream_new(parse_msgpack,
|
||||
CHANNEL_BUFFER_SIZE,
|
||||
rbuffer_new(CHANNEL_BUFFER_SIZE),
|
||||
channel,
|
||||
NULL);
|
||||
rstream_set_file(channel->data.streams.read, 0);
|
||||
@@ -313,7 +313,7 @@ static void job_err(RStream *rstream, void *data, bool eof)
|
||||
char buf[256];
|
||||
Channel *channel = job_data(data);
|
||||
|
||||
while ((count = rstream_available(rstream))) {
|
||||
while ((count = rstream_pending(rstream))) {
|
||||
size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
|
||||
buf[read] = NUL;
|
||||
ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
|
||||
@@ -336,7 +336,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
|
||||
goto end;
|
||||
}
|
||||
|
||||
uint32_t count = rstream_available(rstream);
|
||||
uint32_t count = rstream_pending(rstream);
|
||||
DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
|
||||
count,
|
||||
rstream);
|
||||
|
@@ -39,7 +39,10 @@ void input_init(void)
|
||||
return;
|
||||
}
|
||||
|
||||
read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL);
|
||||
read_stream = rstream_new(read_cb,
|
||||
rbuffer_new(READ_BUFFER_SIZE),
|
||||
NULL,
|
||||
NULL);
|
||||
rstream_set_file(read_stream, read_cmd_fd);
|
||||
}
|
||||
|
||||
@@ -167,7 +170,7 @@ static InbufPollResult inbuf_poll(int32_t ms)
|
||||
}
|
||||
|
||||
if (input_poll(ms)) {
|
||||
return eof && rstream_available(read_stream) == 0 ?
|
||||
return eof && rstream_pending(read_stream) == 0 ?
|
||||
kInputEof :
|
||||
kInputAvail;
|
||||
}
|
||||
@@ -227,6 +230,6 @@ static int push_event_key(uint8_t *buf, int maxlen)
|
||||
// Check if there's pending input
|
||||
bool input_ready(void)
|
||||
{
|
||||
return rstream_available(read_stream) > 0 || eof;
|
||||
return rstream_pending(read_stream) > 0 || eof;
|
||||
}
|
||||
|
||||
|
@@ -213,8 +213,14 @@ Job *job_start(char **argv,
|
||||
job->in = wstream_new(maxmem);
|
||||
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
|
||||
// Start the readable streams
|
||||
job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
|
||||
job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
|
||||
job->out = rstream_new(read_cb,
|
||||
rbuffer_new(JOB_BUFFER_SIZE),
|
||||
job,
|
||||
job_event_source(job));
|
||||
job->err = rstream_new(read_cb,
|
||||
rbuffer_new(JOB_BUFFER_SIZE),
|
||||
job,
|
||||
job_event_source(job));
|
||||
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
|
||||
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
|
||||
rstream_start(job->out);
|
||||
|
@@ -16,16 +16,22 @@
|
||||
#include "nvim/log.h"
|
||||
#include "nvim/misc1.h"
|
||||
|
||||
struct rbuffer {
|
||||
char *data;
|
||||
size_t capacity, rpos, wpos;
|
||||
RStream *rstream;
|
||||
};
|
||||
|
||||
struct rstream {
|
||||
uv_buf_t uvbuf;
|
||||
void *data;
|
||||
char *buffer;
|
||||
uv_buf_t uvbuf;
|
||||
size_t fpos;
|
||||
RBuffer *buffer;
|
||||
uv_stream_t *stream;
|
||||
uv_idle_t *fread_idle;
|
||||
uv_handle_type file_type;
|
||||
uv_file fd;
|
||||
rstream_cb cb;
|
||||
size_t buffer_size, rpos, wpos, fpos;
|
||||
bool free_handle;
|
||||
EventSource source_override;
|
||||
};
|
||||
@@ -34,27 +40,151 @@ struct rstream {
|
||||
# include "os/rstream.c.generated.h"
|
||||
#endif
|
||||
|
||||
/// Creates a new `RBuffer` instance.
|
||||
RBuffer *rbuffer_new(size_t capacity)
|
||||
{
|
||||
RBuffer *rv = xmalloc(sizeof(RBuffer));
|
||||
rv->data = xmalloc(capacity);
|
||||
rv->capacity = capacity;
|
||||
rv->rpos = rv->wpos = 0;
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// Advances `rbuffer` read pointers to consume data. If the associated
|
||||
/// RStream had stopped because the buffer was full, this will restart it.
|
||||
///
|
||||
/// This is called automatically by rbuffer_read, but when using `rbuffer_data`
|
||||
/// directly, this needs to called after the data was consumed.
|
||||
void rbuffer_consumed(RBuffer *rbuffer, size_t count)
|
||||
{
|
||||
rbuffer->rpos += count;
|
||||
if (count && rbuffer->wpos == rbuffer->capacity) {
|
||||
// `wpos` is at the end of the buffer, so free some space by moving unread
|
||||
// data...
|
||||
rbuffer_relocate(rbuffer);
|
||||
if (rbuffer->rstream) {
|
||||
// restart the associated RStream
|
||||
rstream_start(rbuffer->rstream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Advances `rbuffer` write pointers. If the internal buffer becomes full,
|
||||
/// this will stop the associated RStream instance.
|
||||
void rbuffer_produced(RBuffer *rbuffer, size_t count)
|
||||
{
|
||||
rbuffer->wpos += count;
|
||||
DLOG("Received %u bytes from RStream(address: %p, source: %p)",
|
||||
(size_t)cnt,
|
||||
rbuffer->rstream,
|
||||
rstream_event_source(rbuffer->rstream));
|
||||
|
||||
rbuffer_relocate(rbuffer);
|
||||
if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
|
||||
// The last read filled the buffer, stop reading for now
|
||||
rstream_stop(rbuffer->rstream);
|
||||
DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
|
||||
rstream,
|
||||
rstream_event_source(rstream));
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads data from a `RBuffer` instance into a raw buffer.
|
||||
///
|
||||
/// @param rbuffer The `RBuffer` instance
|
||||
/// @param buffer The buffer which will receive the data
|
||||
/// @param count Number of bytes that `buffer` can accept
|
||||
/// @return The number of bytes copied into `buffer`
|
||||
size_t rbuffer_read(RBuffer *rbuffer, char *buffer, size_t count)
|
||||
{
|
||||
size_t read_count = rbuffer_pending(rbuffer);
|
||||
|
||||
if (count < read_count) {
|
||||
read_count = count;
|
||||
}
|
||||
|
||||
if (read_count > 0) {
|
||||
memcpy(buffer, rbuffer_data(rbuffer), read_count);
|
||||
rbuffer_consumed(rbuffer, read_count);
|
||||
}
|
||||
|
||||
return read_count;
|
||||
}
|
||||
|
||||
/// Copies data to `rbuffer` read queue.
|
||||
///
|
||||
/// @param rbuffer the `RBuffer` instance
|
||||
/// @param buffer The buffer containing data to be copied
|
||||
/// @param count Number of bytes that should be copied
|
||||
/// @return The number of bytes actually copied
|
||||
size_t rbuffer_write(RBuffer *rbuffer, char *buffer, size_t count)
|
||||
{
|
||||
size_t write_count = rbuffer_available(rbuffer);
|
||||
|
||||
if (count < write_count) {
|
||||
write_count = count;
|
||||
}
|
||||
|
||||
if (write_count > 0) {
|
||||
memcpy(rbuffer_data(rbuffer), buffer, write_count);
|
||||
rbuffer_produced(rbuffer, write_count);
|
||||
}
|
||||
|
||||
return write_count;
|
||||
}
|
||||
|
||||
/// Returns a pointer to a raw buffer containing the first byte available for
|
||||
/// reading.
|
||||
char *rbuffer_data(RBuffer *rbuffer)
|
||||
{
|
||||
return rbuffer->data + rbuffer->rpos;
|
||||
}
|
||||
|
||||
/// Returns the number of bytes ready for consumption in `rbuffer`
|
||||
///
|
||||
/// @param rbuffer The `RBuffer` instance
|
||||
/// @return The number of bytes ready for consumption
|
||||
size_t rbuffer_pending(RBuffer *rbuffer)
|
||||
{
|
||||
return rbuffer->wpos - rbuffer->rpos;
|
||||
}
|
||||
|
||||
/// Returns available space in `rbuffer`
|
||||
///
|
||||
/// @param rbuffer The `RBuffer` instance
|
||||
/// @return The space available in number of bytes
|
||||
size_t rbuffer_available(RBuffer *rbuffer)
|
||||
{
|
||||
return rbuffer->capacity - rbuffer->wpos;
|
||||
}
|
||||
|
||||
void rbuffer_free(RBuffer *rbuffer)
|
||||
{
|
||||
free(rbuffer->data);
|
||||
free(rbuffer);
|
||||
}
|
||||
|
||||
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
|
||||
/// necessary for reading from a libuv stream.
|
||||
///
|
||||
/// @param cb A function that will be called whenever some data is available
|
||||
/// for reading with `rstream_read`
|
||||
/// @param buffer_size Size in bytes of the internal buffer.
|
||||
/// @param buffer RBuffer instance to associate with the RStream
|
||||
/// @param data Some state to associate with the `RStream` instance
|
||||
/// @param source_override Replacement for the default source used in events
|
||||
/// emitted by this RStream. If NULL, the default is used.
|
||||
/// @return The newly-allocated `RStream` instance
|
||||
RStream * rstream_new(rstream_cb cb,
|
||||
size_t buffer_size,
|
||||
RBuffer *buffer,
|
||||
void *data,
|
||||
EventSource source_override)
|
||||
{
|
||||
RStream *rv = xmalloc(sizeof(RStream));
|
||||
rv->buffer = xmalloc(buffer_size);
|
||||
rv->buffer_size = buffer_size;
|
||||
rv->buffer = buffer;
|
||||
rv->buffer->rstream = rv;
|
||||
rv->fpos = 0;
|
||||
rv->data = data;
|
||||
rv->cb = cb;
|
||||
rv->rpos = rv->wpos = rv->fpos = 0;
|
||||
rv->stream = NULL;
|
||||
rv->fread_idle = NULL;
|
||||
rv->free_handle = false;
|
||||
@@ -77,7 +207,7 @@ void rstream_free(RStream *rstream)
|
||||
}
|
||||
}
|
||||
|
||||
free(rstream->buffer);
|
||||
rbuffer_free(rstream->buffer);
|
||||
free(rstream);
|
||||
}
|
||||
|
||||
@@ -166,51 +296,21 @@ void rstream_stop(RStream *rstream)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of bytes ready for consumption in `rstream`
|
||||
size_t rstream_pending(RStream *rstream)
|
||||
{
|
||||
return rbuffer_pending(rstream->buffer);
|
||||
}
|
||||
|
||||
/// Reads data from a `RStream` instance into a buffer.
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @param buffer The buffer which will receive the data
|
||||
/// @param count Number of bytes that `buffer` can accept
|
||||
/// @return The number of bytes copied into `buffer`
|
||||
size_t rstream_read(RStream *rstream, char *buf, size_t count)
|
||||
size_t rstream_read(RStream *rstream, char *buffer, size_t count)
|
||||
{
|
||||
size_t read_count = rstream->wpos - rstream->rpos;
|
||||
|
||||
if (count < read_count) {
|
||||
read_count = count;
|
||||
}
|
||||
|
||||
if (read_count > 0) {
|
||||
memcpy(buf, rstream->buffer + rstream->rpos, read_count);
|
||||
rstream->rpos += read_count;
|
||||
}
|
||||
|
||||
if (rstream->wpos == rstream->buffer_size) {
|
||||
// `wpos` is at the end of the buffer, so free some space by moving unread
|
||||
// data...
|
||||
memmove(
|
||||
rstream->buffer, // ...To the beginning of the buffer(rpos 0)
|
||||
rstream->buffer + rstream->rpos, // ...From the first unread position
|
||||
rstream->wpos - rstream->rpos); // ...By the number of unread bytes
|
||||
rstream->wpos -= rstream->rpos;
|
||||
rstream->rpos = 0;
|
||||
|
||||
if (rstream->wpos < rstream->buffer_size) {
|
||||
// Restart reading since we have freed some space
|
||||
rstream_start(rstream);
|
||||
}
|
||||
}
|
||||
|
||||
return read_count;
|
||||
}
|
||||
|
||||
/// Returns the number of bytes available for reading from `rstream`
|
||||
///
|
||||
/// @param rstream The `RStream` instance
|
||||
/// @return The number of bytes available
|
||||
size_t rstream_available(RStream *rstream)
|
||||
{
|
||||
return rstream->wpos - rstream->rpos;
|
||||
return rbuffer_read(rstream->buffer, buffer, count);
|
||||
}
|
||||
|
||||
/// Runs the read callback associated with the rstream
|
||||
@@ -235,8 +335,8 @@ static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
|
||||
{
|
||||
RStream *rstream = handle_get_rstream(handle);
|
||||
|
||||
buf->len = rstream->buffer_size - rstream->wpos;
|
||||
buf->base = rstream->buffer + rstream->wpos;
|
||||
buf->len = rbuffer_available(rstream->buffer);
|
||||
buf->base = rbuffer_data(rstream->buffer);
|
||||
}
|
||||
|
||||
// Callback invoked by libuv after it copies the data into the buffer provided
|
||||
@@ -264,20 +364,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
|
||||
|
||||
// Data was already written, so all we need is to update 'wpos' to reflect
|
||||
// the space actually used in the buffer.
|
||||
rstream->wpos += nread;
|
||||
DLOG("Received %u bytes from RStream(address: %p, source: %p)",
|
||||
(size_t)cnt,
|
||||
rstream,
|
||||
rstream_event_source(rstream));
|
||||
|
||||
if (rstream->wpos == rstream->buffer_size) {
|
||||
// The last read filled the buffer, stop reading for now
|
||||
rstream_stop(rstream);
|
||||
DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
|
||||
rstream,
|
||||
rstream_event_source(rstream));
|
||||
}
|
||||
|
||||
rbuffer_produced(rstream->buffer, nread);
|
||||
emit_read_event(rstream, false);
|
||||
}
|
||||
|
||||
@@ -287,8 +374,8 @@ static void fread_idle_cb(uv_idle_t *handle)
|
||||
uv_fs_t req;
|
||||
RStream *rstream = handle_get_rstream((uv_handle_t *)handle);
|
||||
|
||||
rstream->uvbuf.base = rstream->buffer + rstream->wpos;
|
||||
rstream->uvbuf.len = rstream->buffer_size - rstream->wpos;
|
||||
rstream->uvbuf.len = rstream->buffer->capacity - rstream->buffer->wpos;
|
||||
rstream->uvbuf.base = rstream->buffer->data + rstream->buffer->wpos;
|
||||
|
||||
// the offset argument to uv_fs_read is int64_t, could someone really try
|
||||
// to read more than 9 quintillion (9e18) bytes?
|
||||
@@ -319,15 +406,8 @@ static void fread_idle_cb(uv_idle_t *handle)
|
||||
|
||||
// no errors (req.result (ssize_t) is positive), it's safe to cast.
|
||||
size_t nread = (size_t) req.result;
|
||||
|
||||
rstream->wpos += nread;
|
||||
rbuffer_produced(rstream->buffer, nread);
|
||||
rstream->fpos += nread;
|
||||
|
||||
if (rstream->wpos == rstream->buffer_size) {
|
||||
// The last read filled the buffer, stop reading for now
|
||||
rstream_stop(rstream);
|
||||
}
|
||||
|
||||
emit_read_event(rstream, false);
|
||||
}
|
||||
|
||||
@@ -349,3 +429,14 @@ static void emit_read_event(RStream *rstream, bool eof)
|
||||
};
|
||||
event_push(event);
|
||||
}
|
||||
|
||||
static void rbuffer_relocate(RBuffer *rbuffer)
|
||||
{
|
||||
// Move data ...
|
||||
memmove(
|
||||
rbuffer->data, // ...to the beginning of the buffer(rpos 0)
|
||||
rbuffer->data + rbuffer->rpos, // ...From the first unread position
|
||||
rbuffer->wpos - rbuffer->rpos); // ...By the number of unread bytes
|
||||
rbuffer->wpos -= rbuffer->rpos;
|
||||
rbuffer->rpos = 0;
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@
|
||||
|
||||
#include <stdbool.h>
|
||||
|
||||
typedef struct rbuffer RBuffer;
|
||||
typedef struct rstream RStream;
|
||||
|
||||
/// Type of function called when the RStream receives data
|
||||
|
@@ -341,7 +341,7 @@ static void system_data_cb(RStream *rstream, void *data, bool eof)
|
||||
Job *job = data;
|
||||
dyn_buffer_t *buf = job_data(job);
|
||||
|
||||
size_t nread = rstream_available(rstream);
|
||||
size_t nread = rstream_pending(rstream);
|
||||
|
||||
dyn_buf_ensure(buf, buf->len + nread + 1);
|
||||
rstream_read(rstream, buf->data + buf->len, nread);
|
||||
|
Reference in New Issue
Block a user