Files
neovim/src/nvim/os/rstream.c
Thiago de Arruda 1843f0c2c6 rstream: Rename RStream async flag to defer
The name `async` was not appropriate to describe the behavior enabled by the
flag.
2014-06-18 11:36:07 -03:00

347 lines
9.5 KiB
C

#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <uv.h>
#include "nvim/os/uv_helpers.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/os/event_defs.h"
#include "nvim/os/event.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
struct rstream {
uv_buf_t uvbuf;
void *data;
char *buffer;
uv_stream_t *stream;
uv_idle_t *fread_idle;
uv_handle_type file_type;
uv_file fd;
rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos;
bool reading, free_handle, defer;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/rstream.c.generated.h"
#endif
/// Creates a new RStream instance. A RStream encapsulates all the boilerplate
/// necessary for reading from a libuv stream.
///
/// @param cb A function that will be called whenever some data is available
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
/// @param defer Flag that specifies if callback invocation should be deferred
/// to vim main loop(as a KE_EVENT special key)
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
size_t buffer_size,
void *data,
bool defer)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
rv->defer = defer;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
return rv;
}
/// Frees all memory allocated for a RStream instance
///
/// @param rstream The `RStream` instance
void rstream_free(RStream *rstream)
{
if (rstream->free_handle) {
if (rstream->fread_idle != NULL) {
uv_close((uv_handle_t *)rstream->fread_idle, close_cb);
} else {
uv_close((uv_handle_t *)rstream->stream, close_cb);
}
}
free(rstream->buffer);
free(rstream);
}
/// Sets the underlying `uv_stream_t` instance
///
/// @param rstream The `RStream` instance
/// @param stream The new `uv_stream_t` instance
void rstream_set_stream(RStream *rstream, uv_stream_t *stream)
{
handle_set_rstream((uv_handle_t *)stream, rstream);
rstream->stream = stream;
}
/// Sets the underlying file descriptor that will be read from. Only pipes
/// and regular files are supported for now.
///
/// @param rstream The `RStream` instance
/// @param file The file descriptor
void rstream_set_file(RStream *rstream, uv_file file)
{
rstream->file_type = uv_guess_handle(file);
if (rstream->free_handle) {
// If this is the second time we're calling this function, free the
// previously allocated memory
if (rstream->fread_idle != NULL) {
uv_close((uv_handle_t *)rstream->fread_idle, close_cb);
} else {
uv_close((uv_handle_t *)rstream->stream, close_cb);
}
}
if (rstream->file_type == UV_FILE) {
// Non-blocking file reads are simulated with an idle handle that reads
// in chunks of rstream->buffer_size, giving time for other events to
// be processed between reads.
rstream->fread_idle = xmalloc(sizeof(uv_idle_t));
uv_idle_init(uv_default_loop(), rstream->fread_idle);
rstream->fread_idle->data = NULL;
handle_set_rstream((uv_handle_t *)rstream->fread_idle, rstream);
} else {
// Only pipes are supported for now
assert(rstream->file_type == UV_NAMED_PIPE
|| rstream->file_type == UV_TTY);
rstream->stream = xmalloc(sizeof(uv_pipe_t));
uv_pipe_init(uv_default_loop(), (uv_pipe_t *)rstream->stream, 0);
uv_pipe_open((uv_pipe_t *)rstream->stream, file);
rstream->stream->data = NULL;
handle_set_rstream((uv_handle_t *)rstream->stream, rstream);
}
rstream->fd = file;
rstream->free_handle = true;
}
/// Tests if the stream is backed by a regular file
///
/// @param rstream The `RStream` instance
/// @return True if the underlying file descriptor represents a regular file
bool rstream_is_regular_file(RStream *rstream)
{
return rstream->file_type == UV_FILE;
}
/// Starts watching for events from a `RStream` instance.
///
/// @param rstream The `RStream` instance
void rstream_start(RStream *rstream)
{
if (rstream->file_type == UV_FILE) {
uv_idle_start(rstream->fread_idle, fread_idle_cb);
} else {
rstream->reading = false;
uv_read_start(rstream->stream, alloc_cb, read_cb);
}
}
/// Stops watching for events from a `RStream` instance.
///
/// @param rstream The `RStream` instance
void rstream_stop(RStream *rstream)
{
if (rstream->file_type == UV_FILE) {
uv_idle_stop(rstream->fread_idle);
} else {
uv_read_stop(rstream->stream);
}
}
/// Reads data from a `RStream` instance into a buffer.
///
/// @param rstream The `RStream` instance
/// @param buffer The buffer which will receive the data
/// @param count Number of bytes that `buffer` can accept
/// @return The number of bytes copied into `buffer`
size_t rstream_read(RStream *rstream, char *buf, size_t count)
{
size_t read_count = rstream->wpos - rstream->rpos;
if (count < read_count) {
read_count = count;
}
if (read_count > 0) {
memcpy(buf, rstream->buffer + rstream->rpos, read_count);
rstream->rpos += read_count;
}
if (rstream->wpos == rstream->buffer_size) {
// `wpos` is at the end of the buffer, so free some space by moving unread
// data...
memmove(
rstream->buffer, // ...To the beginning of the buffer(rpos 0)
rstream->buffer + rstream->rpos, // ...From the first unread position
rstream->wpos - rstream->rpos); // ...By the number of unread bytes
rstream->wpos -= rstream->rpos;
rstream->rpos = 0;
if (rstream->wpos < rstream->buffer_size) {
// Restart reading since we have freed some space
rstream_start(rstream);
}
}
return read_count;
}
/// Returns the number of bytes available for reading from `rstream`
///
/// @param rstream The `RStream` instance
/// @return The number of bytes available
size_t rstream_available(RStream *rstream)
{
return rstream->wpos - rstream->rpos;
}
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
void rstream_read_event(Event event)
{
RStream *rstream = event.data.rstream.ptr;
rstream->cb(rstream, rstream->data, event.data.rstream.eof);
}
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
{
RStream *rstream = handle_get_rstream(handle);
if (rstream->reading) {
buf->len = 0;
return;
}
buf->len = rstream->buffer_size - rstream->wpos;
buf->base = rstream->buffer + rstream->wpos;
// Avoid `alloc_cb`, `alloc_cb` sequences on windows
rstream->reading = true;
}
// Callback invoked by libuv after it copies the data into the buffer provided
// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
// 0-length buffer.
static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
{
RStream *rstream = handle_get_rstream((uv_handle_t *)stream);
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
emit_read_event(rstream, true);
}
return;
}
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t) cnt;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rstream->wpos += nread;
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
}
rstream->reading = false;
emit_read_event(rstream, false);
}
// Called by the by the 'idle' handle to emulate a reading event
static void fread_idle_cb(uv_idle_t *handle)
{
uv_fs_t req;
RStream *rstream = handle_get_rstream((uv_handle_t *)handle);
rstream->uvbuf.base = rstream->buffer + rstream->wpos;
rstream->uvbuf.len = rstream->buffer_size - rstream->wpos;
// the offset argument to uv_fs_read is int64_t, could someone really try
// to read more than 9 quintillion (9e18) bytes?
// upcast is meant to avoid tautological condition warning on 32 bits
uintmax_t fpos_intmax = rstream->fpos;
if (fpos_intmax > INT64_MAX) {
ELOG("stream offset overflow");
preserve_exit();
}
// Synchronous read
uv_fs_read(
uv_default_loop(),
&req,
rstream->fd,
&rstream->uvbuf,
1,
(int64_t) rstream->fpos,
NULL);
uv_fs_req_cleanup(&req);
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
emit_read_event(rstream, true);
return;
}
// no errors (req.result (ssize_t) is positive), it's safe to cast.
size_t nread = (size_t) req.result;
rstream->wpos += nread;
rstream->fpos += nread;
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
}
emit_read_event(rstream, false);
}
static void close_cb(uv_handle_t *handle)
{
free(handle->data);
free(handle);
}
static void emit_read_event(RStream *rstream, bool eof)
{
if (rstream->defer) {
Event event;
event.type = kEventRStreamData;
event.data.rstream.ptr = rstream;
event.data.rstream.eof = eof;
event_push(event);
} else {
// Invoke the callback passing in the number of bytes available and data
// associated with the stream
rstream->cb(rstream, rstream->data, eof);
}
}