mirror of
https://github.com/neovim/neovim.git
synced 2025-09-28 22:18:33 +00:00
feat(api): broadcast events to ALL channels #28487
Problem: `vim.rpcnotify(0)` and `rpcnotify(0)` are documented as follows: If {channel} is 0, the event is broadcast to all channels. But that's not actually true. Channels must call `nvim_subscribe` to receive "broadcast" events, so it's actually "multicast". - Assuming there is a use-case for "broadcast", the current model adds an extra step for broadcasting: all channels need to "subscribe". - The presence of `nvim_subscribe` is a source of confusion for users, because its name implies something more generally useful than what it does. Presumably the use-case of `nvim_subscribe` is to avoid "noise" on RPC channels not expected a broadcast notification, and potentially an error if the channel client reports an unknown event. Solution: - Deprecate `nvim_subscribe`/`nvim_unsubscribe`. - If applications want to multicast, they can keep their own multicast list. Or they can use `nvim_list_chans()` and `nvim_get_chan_info()` to enumerate and filter the clients they want to target. - Always send "broadcast" events to ALL channels. Don't require channels to "subscribe" to receive broadcasts. This matches the documented behavior of `rpcnotify()`.
This commit is contained in:
@@ -786,3 +786,23 @@ theend:
|
||||
api_clear_error(&nested_error);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// @deprecated
|
||||
///
|
||||
/// @param channel_id Channel id (passed automatically by the dispatcher)
|
||||
/// @param event Event type string
|
||||
void nvim_subscribe(uint64_t channel_id, String event)
|
||||
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
|
||||
{
|
||||
// Does nothing. `rpcnotify(0,…)` broadcasts to all channels, there are no "subscriptions".
|
||||
}
|
||||
|
||||
/// @deprecated
|
||||
///
|
||||
/// @param channel_id Channel id (passed automatically by the dispatcher)
|
||||
/// @param event Event type string
|
||||
void nvim_unsubscribe(uint64_t channel_id, String event)
|
||||
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
|
||||
{
|
||||
// Does nothing. `rpcnotify(0,…)` broadcasts to all channels, there are no "subscriptions".
|
||||
}
|
||||
|
@@ -1334,36 +1334,6 @@ void nvim_put(ArrayOf(String) lines, String type, Boolean after, Boolean follow,
|
||||
});
|
||||
}
|
||||
|
||||
/// Subscribes to event broadcasts.
|
||||
///
|
||||
/// @param channel_id Channel id (passed automatically by the dispatcher)
|
||||
/// @param event Event type string
|
||||
void nvim_subscribe(uint64_t channel_id, String event)
|
||||
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
|
||||
{
|
||||
size_t length = (event.size < METHOD_MAXLEN ? event.size : METHOD_MAXLEN);
|
||||
char e[METHOD_MAXLEN + 1];
|
||||
memcpy(e, event.data, length);
|
||||
e[length] = NUL;
|
||||
rpc_subscribe(channel_id, e);
|
||||
}
|
||||
|
||||
/// Unsubscribes to event broadcasts.
|
||||
///
|
||||
/// @param channel_id Channel id (passed automatically by the dispatcher)
|
||||
/// @param event Event type string
|
||||
void nvim_unsubscribe(uint64_t channel_id, String event)
|
||||
FUNC_API_SINCE(1) FUNC_API_REMOTE_ONLY
|
||||
{
|
||||
size_t length = (event.size < METHOD_MAXLEN
|
||||
? event.size
|
||||
: METHOD_MAXLEN);
|
||||
char e[METHOD_MAXLEN + 1];
|
||||
memcpy(e, event.data, length);
|
||||
e[length] = NUL;
|
||||
rpc_unsubscribe(channel_id, e);
|
||||
}
|
||||
|
||||
/// Returns the 24-bit RGB value of a |nvim_get_color_map()| color name or
|
||||
/// "#rrggbb" hexadecimal string.
|
||||
///
|
||||
|
@@ -67,8 +67,6 @@ static void log_notify(char *dir, uint64_t channel_id, const char *name)
|
||||
# define log_notify(...)
|
||||
#endif
|
||||
|
||||
static Set(cstr_t) event_strings = SET_INIT;
|
||||
|
||||
#ifdef INCLUDE_GENERATED_DECLARATIONS
|
||||
# include "msgpack_rpc/channel.c.generated.h"
|
||||
#endif
|
||||
@@ -111,9 +109,9 @@ static Channel *find_rpc_channel(uint64_t id)
|
||||
return chan;
|
||||
}
|
||||
|
||||
/// Publishes an event to a channel.
|
||||
/// Publishes an event to a channel (emits a notification to method `name`).
|
||||
///
|
||||
/// @param id Channel id. 0 means "broadcast to all subscribed channels"
|
||||
/// @param id Channel id, or 0 to broadcast to all RPC channels.
|
||||
/// @param name Event name (application-defined)
|
||||
/// @param args Array of event arguments
|
||||
/// @return True if the event was sent successfully, false otherwise.
|
||||
@@ -204,41 +202,6 @@ Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem
|
||||
return frame.errored ? NIL : frame.result;
|
||||
}
|
||||
|
||||
/// Subscribes to event broadcasts
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void rpc_subscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
abort();
|
||||
}
|
||||
|
||||
const char **key_alloc = NULL;
|
||||
if (set_put_ref(cstr_t, &event_strings, event, &key_alloc)) {
|
||||
*key_alloc = xstrdup(event);
|
||||
}
|
||||
|
||||
set_put(cstr_t, channel->rpc.subscribed_events, *key_alloc);
|
||||
}
|
||||
|
||||
/// Unsubscribes to event broadcasts
|
||||
///
|
||||
/// @param id The channel id
|
||||
/// @param event The event type string
|
||||
void rpc_unsubscribe(uint64_t id, char *event)
|
||||
{
|
||||
Channel *channel;
|
||||
|
||||
if (!(channel = find_rpc_channel(id))) {
|
||||
abort();
|
||||
}
|
||||
|
||||
unsubscribe(channel, event);
|
||||
}
|
||||
|
||||
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data, bool eof)
|
||||
{
|
||||
Channel *channel = data;
|
||||
@@ -494,34 +457,24 @@ static void send_error(Channel *chan, MsgpackRpcRequestHandler handler, MessageT
|
||||
api_clear_error(&e);
|
||||
}
|
||||
|
||||
/// Broadcasts a notification to all RPC channels.
|
||||
static void broadcast_event(const char *name, Array args)
|
||||
{
|
||||
kvec_withinit_t(Channel *, 4) subscribed = KV_INITIAL_VALUE;
|
||||
kvi_init(subscribed);
|
||||
kvec_withinit_t(Channel *, 4) chans = KV_INITIAL_VALUE;
|
||||
kvi_init(chans);
|
||||
Channel *channel;
|
||||
|
||||
map_foreach_value(&channels, channel, {
|
||||
if (channel->is_rpc
|
||||
&& set_has(cstr_t, channel->rpc.subscribed_events, name)) {
|
||||
kv_push(subscribed, channel);
|
||||
if (channel->is_rpc) {
|
||||
kv_push(chans, channel);
|
||||
}
|
||||
});
|
||||
|
||||
if (kv_size(subscribed)) {
|
||||
serialize_request(subscribed.items, kv_size(subscribed), 0, name, args);
|
||||
if (kv_size(chans)) {
|
||||
serialize_request(chans.items, kv_size(chans), 0, name, args);
|
||||
}
|
||||
|
||||
kvi_destroy(subscribed);
|
||||
}
|
||||
|
||||
static void unsubscribe(Channel *channel, char *event)
|
||||
{
|
||||
if (!set_has(cstr_t, &event_strings, event)) {
|
||||
WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
|
||||
channel->id, event);
|
||||
return;
|
||||
}
|
||||
set_del(cstr_t, channel->rpc.subscribed_events, event);
|
||||
kvi_destroy(chans);
|
||||
}
|
||||
|
||||
/// Mark rpc state as closed, and release its reference to the channel.
|
||||
@@ -551,7 +504,6 @@ void rpc_free(Channel *channel)
|
||||
unpacker_teardown(channel->rpc.unpacker);
|
||||
xfree(channel->rpc.unpacker);
|
||||
|
||||
set_destroy(cstr_t, channel->rpc.subscribed_events);
|
||||
kv_destroy(channel->rpc.call_stack);
|
||||
api_free_dictionary(channel->rpc.info);
|
||||
}
|
||||
@@ -719,12 +671,6 @@ const char *get_client_info(Channel *chan, const char *key)
|
||||
#ifdef EXITFREE
|
||||
void rpc_free_all_mem(void)
|
||||
{
|
||||
cstr_t key;
|
||||
set_foreach(&event_strings, key, {
|
||||
xfree((void *)key);
|
||||
});
|
||||
set_destroy(cstr_t, &event_strings);
|
||||
|
||||
multiqueue_free(ch_before_blocking_events);
|
||||
}
|
||||
#endif
|
||||
|
@@ -37,7 +37,6 @@ typedef struct {
|
||||
} RequestEvent;
|
||||
|
||||
typedef struct {
|
||||
Set(cstr_t) subscribed_events[1];
|
||||
bool closed;
|
||||
Unpacker *unpacker;
|
||||
uint32_t next_request_id;
|
||||
|
Reference in New Issue
Block a user