mirror of
				https://github.com/neovim/neovim.git
				synced 2025-10-26 12:27:24 +00:00 
			
		
		
		
	Merge pull request #10021 from bfredl/chanevent
channel: refactor events, prevent recursive invocation of callbacks
This commit is contained in:
		| @@ -22,19 +22,10 @@ PMap(uint64_t) *channels = NULL; | ||||
| /// 2 is reserved for stderr channel | ||||
| static uint64_t next_chan_id = CHAN_STDERR+1; | ||||
|  | ||||
|  | ||||
| typedef struct { | ||||
|   Channel *chan; | ||||
|   Callback *callback; | ||||
|   const char *type; | ||||
|   // if reader is set, status is ignored. | ||||
|   CallbackReader *reader; | ||||
|   int status; | ||||
| } ChannelEvent; | ||||
|  | ||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||
| # include "channel.c.generated.h" | ||||
| #endif | ||||
|  | ||||
| /// Teardown the module | ||||
| void channel_teardown(void) | ||||
| { | ||||
| @@ -179,6 +170,7 @@ static Channel *channel_alloc(ChannelStreamType type) | ||||
|   } | ||||
|   chan->events = multiqueue_new_child(main_loop.events); | ||||
|   chan->refcount = 1; | ||||
|   chan->exit_status = -1; | ||||
|   chan->streamtype = type; | ||||
|   pmap_put(uint64_t)(channels, chan->id, chan); | ||||
|   return chan; | ||||
| @@ -234,9 +226,10 @@ void callback_reader_free(CallbackReader *reader) | ||||
|   ga_clear(&reader->buffer); | ||||
| } | ||||
|  | ||||
| void callback_reader_start(CallbackReader *reader) | ||||
| void callback_reader_start(CallbackReader *reader, const char *type) | ||||
| { | ||||
|   ga_init(&reader->buffer, sizeof(char *), 32); | ||||
|   reader->type = type; | ||||
| } | ||||
|  | ||||
| static void free_channel_event(void **argv) | ||||
| @@ -246,7 +239,7 @@ static void free_channel_event(void **argv) | ||||
|     rpc_free(chan); | ||||
|   } | ||||
|  | ||||
|   callback_reader_free(&chan->on_stdout); | ||||
|   callback_reader_free(&chan->on_data); | ||||
|   callback_reader_free(&chan->on_stderr); | ||||
|   callback_free(&chan->on_exit); | ||||
|  | ||||
| @@ -286,7 +279,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout, | ||||
|   assert(cwd == NULL || os_isdir_executable(cwd)); | ||||
|  | ||||
|   Channel *chan = channel_alloc(kChannelStreamProc); | ||||
|   chan->on_stdout = on_stdout; | ||||
|   chan->on_data = on_stdout; | ||||
|   chan->on_stderr = on_stderr; | ||||
|   chan->on_exit = on_exit; | ||||
|  | ||||
| @@ -326,7 +319,7 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout, | ||||
|     has_out = true; | ||||
|     has_err = false; | ||||
|   } else { | ||||
|     has_out = rpc || callback_reader_set(chan->on_stdout); | ||||
|     has_out = rpc || callback_reader_set(chan->on_data); | ||||
|     has_err = callback_reader_set(chan->on_stderr); | ||||
|   } | ||||
|   int status = process_spawn(proc, true, has_out, has_err); | ||||
| @@ -352,13 +345,13 @@ Channel *channel_job_start(char **argv, CallbackReader on_stdout, | ||||
|     rpc_start(chan); | ||||
|   } else { | ||||
|     if (has_out) { | ||||
|       callback_reader_start(&chan->on_stdout); | ||||
|       rstream_start(&proc->out, on_job_stdout, chan); | ||||
|       callback_reader_start(&chan->on_data, "stdout"); | ||||
|       rstream_start(&proc->out, on_channel_data, chan); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   if (has_err) { | ||||
|     callback_reader_start(&chan->on_stderr); | ||||
|     callback_reader_start(&chan->on_stderr, "stderr"); | ||||
|     rstream_init(&proc->err, 0); | ||||
|     rstream_start(&proc->err, on_job_stderr, chan); | ||||
|   } | ||||
| @@ -402,9 +395,9 @@ uint64_t channel_connect(bool tcp, const char *address, | ||||
|   if (rpc) { | ||||
|     rpc_start(channel); | ||||
|   } else { | ||||
|     channel->on_stdout = on_output; | ||||
|     callback_reader_start(&channel->on_stdout); | ||||
|     rstream_start(&channel->stream.socket, on_socket_output, channel); | ||||
|     channel->on_data = on_output; | ||||
|     callback_reader_start(&channel->on_data, "data"); | ||||
|     rstream_start(&channel->stream.socket, on_channel_data, channel); | ||||
|   } | ||||
|  | ||||
| end: | ||||
| @@ -452,9 +445,9 @@ uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, | ||||
|   if (rpc) { | ||||
|     rpc_start(channel); | ||||
|   } else { | ||||
|     channel->on_stdout = on_output; | ||||
|     callback_reader_start(&channel->on_stdout); | ||||
|     rstream_start(&channel->stream.stdio.in, on_stdio_input, channel); | ||||
|     channel->on_data = on_output; | ||||
|     callback_reader_start(&channel->on_data, "stdin"); | ||||
|     rstream_start(&channel->stream.stdio.in, on_channel_data, channel); | ||||
|   } | ||||
|  | ||||
|   return channel->id; | ||||
| @@ -519,55 +512,22 @@ static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) | ||||
|   return l; | ||||
| } | ||||
|  | ||||
| // vimscript job callbacks must be executed on Nvim main loop | ||||
| static inline void process_channel_event(Channel *chan, Callback *callback, | ||||
|                                          const char *type, | ||||
|                                          CallbackReader *reader, int status) | ||||
| { | ||||
|   assert(callback); | ||||
|   ChannelEvent *event_data = xmalloc(sizeof(*event_data)); | ||||
|   event_data->reader = reader; | ||||
|   event_data->status = status; | ||||
|   channel_incref(chan);  // Hold on ref to callback | ||||
|   event_data->chan = chan; | ||||
|   event_data->callback = callback; | ||||
|   event_data->type = type; | ||||
|  | ||||
|   multiqueue_put(chan->events, on_channel_event, 1, event_data); | ||||
| } | ||||
|  | ||||
| void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, | ||||
|                    void *data, bool eof) | ||||
| void on_channel_data(Stream *stream, RBuffer *buf, size_t count, | ||||
|                      void *data, bool eof) | ||||
| { | ||||
|   Channel *chan = data; | ||||
|   on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout"); | ||||
|   on_channel_output(stream, chan, buf, count, eof, &chan->on_data); | ||||
| } | ||||
|  | ||||
| void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, | ||||
|                    void *data, bool eof) | ||||
| { | ||||
|   Channel *chan = data; | ||||
|   on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr"); | ||||
|   on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr); | ||||
| } | ||||
|  | ||||
| static void on_socket_output(Stream *stream, RBuffer *buf, size_t count, | ||||
|                              void *data, bool eof) | ||||
| { | ||||
|   Channel *chan = data; | ||||
|   on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data"); | ||||
| } | ||||
|  | ||||
| static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count, | ||||
|                            void *data, bool eof) | ||||
| { | ||||
|   Channel *chan = data; | ||||
|   on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin"); | ||||
| } | ||||
|  | ||||
| /// @param type must have static lifetime | ||||
| static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, | ||||
|                               size_t count, bool eof, CallbackReader *reader, | ||||
|                               const char *type) | ||||
|                               size_t count, bool eof, CallbackReader *reader) | ||||
| { | ||||
|   // stub variable, to keep reading consistent with the order of events, only | ||||
|   // consider the count parameter. | ||||
| @@ -575,57 +535,93 @@ static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, | ||||
|   char *ptr = rbuffer_read_ptr(buf, &r); | ||||
|  | ||||
|   if (eof) { | ||||
|     if (reader->buffered) { | ||||
|       if (reader->cb.type != kCallbackNone) { | ||||
|         process_channel_event(chan, &reader->cb, type, reader, 0); | ||||
|       } else if (reader->self) { | ||||
|         if (tv_dict_find(reader->self, type, -1) == NULL) { | ||||
|           list_T *data = buffer_to_tv_list(reader->buffer.ga_data, | ||||
|                                            (size_t)reader->buffer.ga_len); | ||||
|           tv_dict_add_list(reader->self, type, strlen(type), data); | ||||
|         } else { | ||||
|             // can't display error message now, defer it. | ||||
|             channel_incref(chan); | ||||
|             multiqueue_put(chan->events, on_buffered_error, 2, chan, type); | ||||
|         } | ||||
|         ga_clear(&reader->buffer); | ||||
|       } else { | ||||
|         abort(); | ||||
|       } | ||||
|     } else if (reader->cb.type != kCallbackNone) { | ||||
|       process_channel_event(chan, &reader->cb, type, reader, 0); | ||||
|     reader->eof = true; | ||||
|   } else { | ||||
|     if (chan->term) { | ||||
|       terminal_receive(chan->term, ptr, count); | ||||
|       terminal_flush_output(chan->term); | ||||
|     } | ||||
|  | ||||
|     rbuffer_consumed(buf, count); | ||||
|  | ||||
|     if (callback_reader_set(*reader)) { | ||||
|       ga_concat_len(&reader->buffer, ptr, count); | ||||
|     } | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   // The order here matters, the terminal must receive the data first because | ||||
|   // process_channel_event will modify the read buffer(convert NULs into NLs) | ||||
|   if (chan->term) { | ||||
|     terminal_receive(chan->term, ptr, count); | ||||
|     terminal_flush_output(chan->term); | ||||
|   } | ||||
|  | ||||
|   rbuffer_consumed(buf, count); | ||||
|  | ||||
|   if (callback_reader_set(*reader) || reader->buffered) { | ||||
|     // if buffer wasn't consumed, a pending callback is stalled. Aggregate the | ||||
|     // received data and avoid a "burst" of multiple callbacks. | ||||
|     bool buffer_set = reader->buffer.ga_len > 0; | ||||
|     ga_concat_len(&reader->buffer, ptr, count); | ||||
|     if (callback_reader_set(*reader) && !reader->buffered && !buffer_set) { | ||||
|       process_channel_event(chan, &reader->cb, type, reader, 0); | ||||
|     } | ||||
|   if (callback_reader_set(*reader)) { | ||||
|     schedule_channel_event(chan); | ||||
|   } | ||||
| } | ||||
|  | ||||
| static void on_buffered_error(void **args) | ||||
| /// schedule the necessary callbacks to be invoked as a deferred event | ||||
| static void schedule_channel_event(Channel *chan) | ||||
| { | ||||
|   if (!chan->callback_scheduled) { | ||||
|     if (!chan->callback_busy) { | ||||
|       multiqueue_put(chan->events, on_channel_event, 1, chan); | ||||
|       channel_incref(chan); | ||||
|     } | ||||
|     chan->callback_scheduled = true; | ||||
|   } | ||||
| } | ||||
|  | ||||
| static void on_channel_event(void **args) | ||||
| { | ||||
|   Channel *chan = (Channel *)args[0]; | ||||
|   const char *stream = (const char *)args[1]; | ||||
|   EMSG3(_(e_streamkey), stream, chan->id); | ||||
|  | ||||
|   chan->callback_busy = true; | ||||
|   chan->callback_scheduled = false; | ||||
|  | ||||
|   int exit_status = chan->exit_status; | ||||
|   channel_reader_callbacks(chan, &chan->on_data); | ||||
|   channel_reader_callbacks(chan, &chan->on_stderr); | ||||
|   if (exit_status > -1) { | ||||
|     channel_callback_call(chan, NULL); | ||||
|     chan->exit_status = -1; | ||||
|   } | ||||
|  | ||||
|   chan->callback_busy = false; | ||||
|   if (chan->callback_scheduled) { | ||||
|     // further callback was deferred to avoid recursion. | ||||
|     multiqueue_put(chan->events, on_channel_event, 1, chan); | ||||
|     channel_incref(chan); | ||||
|   } | ||||
|  | ||||
|   channel_decref(chan); | ||||
| } | ||||
|  | ||||
| void channel_reader_callbacks(Channel *chan, CallbackReader *reader) | ||||
| { | ||||
|   if (reader->buffered) { | ||||
|     if (reader->eof) { | ||||
|       if (reader->self) { | ||||
|         if (tv_dict_find(reader->self, reader->type, -1) == NULL) { | ||||
|           list_T *data = buffer_to_tv_list(reader->buffer.ga_data, | ||||
|                                            (size_t)reader->buffer.ga_len); | ||||
|           tv_dict_add_list(reader->self, reader->type, strlen(reader->type), | ||||
|                            data); | ||||
|         } else { | ||||
|           EMSG3(_(e_streamkey), reader->type, chan->id); | ||||
|         } | ||||
|       } else { | ||||
|         channel_callback_call(chan, reader); | ||||
|       } | ||||
|       reader->eof = false; | ||||
|     } | ||||
|   } else { | ||||
|     bool is_eof = reader->eof; | ||||
|     if (reader->buffer.ga_len > 0) { | ||||
|       channel_callback_call(chan, reader); | ||||
|     } | ||||
|     // if the stream reached eof, invoke extra callback with no data | ||||
|     if (is_eof) { | ||||
|       channel_callback_call(chan, reader); | ||||
|       reader->eof = false; | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  | ||||
| static void channel_process_exit_cb(Process *proc, int status, void *data) | ||||
| { | ||||
|   Channel *chan = data; | ||||
| @@ -637,45 +633,46 @@ static void channel_process_exit_cb(Process *proc, int status, void *data) | ||||
|  | ||||
|   // If process did not exit, we only closed the handle of a detached process. | ||||
|   bool exited = (status >= 0); | ||||
|   if (exited) { | ||||
|     process_channel_event(chan, &chan->on_exit, "exit", NULL, status); | ||||
|   if (exited && chan->on_exit.type != kCallbackNone) { | ||||
|     schedule_channel_event(chan); | ||||
|     chan->exit_status = status; | ||||
|   } | ||||
|  | ||||
|   channel_decref(chan); | ||||
| } | ||||
|  | ||||
| static void on_channel_event(void **args) | ||||
| static void channel_callback_call(Channel *chan, CallbackReader *reader) | ||||
| { | ||||
|   ChannelEvent *ev = (ChannelEvent *)args[0]; | ||||
|  | ||||
|   Callback *cb; | ||||
|   typval_T argv[4]; | ||||
|  | ||||
|   argv[0].v_type = VAR_NUMBER; | ||||
|   argv[0].v_lock = VAR_UNLOCKED; | ||||
|   argv[0].vval.v_number = (varnumber_T)ev->chan->id; | ||||
|   argv[0].vval.v_number = (varnumber_T)chan->id; | ||||
|  | ||||
|   if (ev->reader) { | ||||
|   if (reader) { | ||||
|     argv[1].v_type = VAR_LIST; | ||||
|     argv[1].v_lock = VAR_UNLOCKED; | ||||
|     argv[1].vval.v_list = buffer_to_tv_list(ev->reader->buffer.ga_data, | ||||
|                                             (size_t)ev->reader->buffer.ga_len); | ||||
|     argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data, | ||||
|                                             (size_t)reader->buffer.ga_len); | ||||
|     tv_list_ref(argv[1].vval.v_list); | ||||
|     ga_clear(&ev->reader->buffer); | ||||
|     ga_clear(&reader->buffer); | ||||
|     cb = &reader->cb; | ||||
|     argv[2].vval.v_string = (char_u *)reader->type; | ||||
|   } else { | ||||
|     argv[1].v_type = VAR_NUMBER; | ||||
|     argv[1].v_lock = VAR_UNLOCKED; | ||||
|     argv[1].vval.v_number = ev->status; | ||||
|     argv[1].vval.v_number = chan->exit_status; | ||||
|     cb = &chan->on_exit; | ||||
|     argv[2].vval.v_string = (char_u *)"exit"; | ||||
|   } | ||||
|  | ||||
|   argv[2].v_type = VAR_STRING; | ||||
|   argv[2].v_lock = VAR_UNLOCKED; | ||||
|   argv[2].vval.v_string = (uint8_t *)ev->type; | ||||
|  | ||||
|   typval_T rettv = TV_INITIAL_VALUE; | ||||
|   callback_call(ev->callback, 3, argv, &rettv); | ||||
|   callback_call(cb, 3, argv, &rettv); | ||||
|   tv_clear(&rettv); | ||||
|   channel_decref(ev->chan); | ||||
|   xfree(ev); | ||||
| } | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -42,13 +42,16 @@ typedef struct { | ||||
|   Callback cb; | ||||
|   dict_T *self; | ||||
|   garray_T buffer; | ||||
|   bool eof; | ||||
|   bool buffered; | ||||
|   const char *type; | ||||
| } CallbackReader; | ||||
|  | ||||
| #define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \ | ||||
|                                                 .self = NULL, \ | ||||
|                                                 .buffer = GA_EMPTY_INIT_VALUE, \ | ||||
|                                                 .buffered = false }) | ||||
|                                                 .buffered = false, \ | ||||
|                                                 .type = NULL }) | ||||
| static inline bool callback_reader_set(CallbackReader reader) | ||||
| { | ||||
|   return reader.cb.type != kCallbackNone || reader.self; | ||||
| @@ -73,9 +76,13 @@ struct Channel { | ||||
|   RpcState rpc; | ||||
|   Terminal *term; | ||||
|  | ||||
|   CallbackReader on_stdout; | ||||
|   CallbackReader on_data; | ||||
|   CallbackReader on_stderr; | ||||
|   Callback on_exit; | ||||
|   int exit_status; | ||||
|  | ||||
|   bool callback_busy; | ||||
|   bool callback_scheduled; | ||||
| }; | ||||
|  | ||||
| EXTERN PMap(uint64_t) *channels; | ||||
|   | ||||
| @@ -5165,7 +5165,7 @@ bool garbage_collect(bool testing) | ||||
|   { | ||||
|     Channel *data; | ||||
|     map_foreach_value(channels, data, { | ||||
|       set_ref_in_callback_reader(&data->on_stdout, copyID, NULL, NULL); | ||||
|       set_ref_in_callback_reader(&data->on_data, copyID, NULL, NULL); | ||||
|       set_ref_in_callback_reader(&data->on_stderr, copyID, NULL, NULL); | ||||
|       set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); | ||||
|     }) | ||||
|   | ||||
| @@ -439,16 +439,66 @@ describe('jobs', function() | ||||
|         call add(self.data, Normalize(a:data)) | ||||
|         sleep 200m | ||||
|       endfunction | ||||
|       function! d.on_exit(job, data, event) dict | ||||
|         let g:exit_data = copy(self.data) | ||||
|       endfunction | ||||
|       if has('win32') | ||||
|         let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)' | ||||
|       else | ||||
|         let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done'] | ||||
|       endif | ||||
|       call jobwait([jobstart(cmd, d)]) | ||||
|       let g:id = jobstart(cmd, d) | ||||
|       sleep 1500m | ||||
|       call jobwait([g:id]) | ||||
|     ]]) | ||||
|  | ||||
|     local expected = {'1', '2', '3', '4', '5', ''} | ||||
|     local chunks = eval('d.data') | ||||
|     -- check nothing was received after exit, including EOF | ||||
|     eq(eval('g:exit_data'), chunks) | ||||
|     local received = {''} | ||||
|     for i, chunk in ipairs(chunks) do | ||||
|       if i < #chunks then | ||||
|         -- if chunks got joined, a spurious [''] callback was not sent | ||||
|         neq({''}, chunk) | ||||
|       else | ||||
|         -- but EOF callback is still sent | ||||
|         eq({''}, chunk) | ||||
|       end | ||||
|       received[#received] = received[#received]..chunk[1] | ||||
|       for j = 2, #chunk do | ||||
|         received[#received+1] = chunk[j] | ||||
|       end | ||||
|     end | ||||
|     eq(expected, received) | ||||
|   end) | ||||
|  | ||||
|   it('does not invoke callbacks recursively', function() | ||||
|     source([[ | ||||
|       let d = {'data': []} | ||||
|       function! d.on_stdout(job, data, event) dict | ||||
|         " if callbacks were invoked recursively, this would cause on_stdout | ||||
|         " to be invoked recursively and the data reversed on the call stack | ||||
|         sleep 200m | ||||
|         call add(self.data, Normalize(a:data)) | ||||
|       endfunction | ||||
|       function! d.on_exit(job, data, event) dict | ||||
|         let g:exit_data = copy(self.data) | ||||
|       endfunction | ||||
|       if has('win32') | ||||
|         let cmd = 'for /L %I in (1,1,5) do @(echo %I& ping -n 2 127.0.0.1 > nul)' | ||||
|       else | ||||
|         let cmd = ['sh', '-c', 'for i in $(seq 1 5); do echo $i; sleep 0.1; done'] | ||||
|       endif | ||||
|       let g:id = jobstart(cmd, d) | ||||
|       sleep 1500m | ||||
|       call jobwait([g:id]) | ||||
|     ]]) | ||||
|  | ||||
|     local expected = {'1', '2', '3', '4', '5', ''} | ||||
|     local chunks = eval('d.data') | ||||
|     -- check nothing was received after exit, including EOF | ||||
|     eq(eval('g:exit_data'), chunks) | ||||
|     local received = {''} | ||||
|     for i, chunk in ipairs(chunks) do | ||||
|       if i < #chunks then | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Björn Linse
					Björn Linse