mirror of
				https://github.com/neovim/neovim.git
				synced 2025-10-26 12:27:24 +00:00 
			
		
		
		
	channels: refactor
This commit is contained in:
		| @@ -252,7 +252,7 @@ static void remote_ui_flush(UI *ui) | |||||||
| { | { | ||||||
|   UIData *data = ui->data; |   UIData *data = ui->data; | ||||||
|   if (data->buffer.size > 0) { |   if (data->buffer.size > 0) { | ||||||
|     channel_send_event(data->channel_id, "redraw", data->buffer); |     rpc_send_event(data->channel_id, "redraw", data->buffer); | ||||||
|     data->buffer = (Array)ARRAY_DICT_INIT; |     data->buffer = (Array)ARRAY_DICT_INIT; | ||||||
|   } |   } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -721,7 +721,7 @@ void nvim_subscribe(uint64_t channel_id, String event) | |||||||
|   char e[METHOD_MAXLEN + 1]; |   char e[METHOD_MAXLEN + 1]; | ||||||
|   memcpy(e, event.data, length); |   memcpy(e, event.data, length); | ||||||
|   e[length] = NUL; |   e[length] = NUL; | ||||||
|   channel_subscribe(channel_id, e); |   rpc_subscribe(channel_id, e); | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Unsubscribes to event broadcasts | /// Unsubscribes to event broadcasts | ||||||
| @@ -737,7 +737,7 @@ void nvim_unsubscribe(uint64_t channel_id, String event) | |||||||
|   char e[METHOD_MAXLEN + 1]; |   char e[METHOD_MAXLEN + 1]; | ||||||
|   memcpy(e, event.data, length); |   memcpy(e, event.data, length); | ||||||
|   e[length] = NUL; |   e[length] = NUL; | ||||||
|   channel_unsubscribe(channel_id, e); |   rpc_unsubscribe(channel_id, e); | ||||||
| } | } | ||||||
|  |  | ||||||
| Integer nvim_get_color_by_name(String name) | Integer nvim_get_color_by_name(String name) | ||||||
|   | |||||||
							
								
								
									
										61
									
								
								src/nvim/channel.c
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								src/nvim/channel.c
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,61 @@ | |||||||
|  | // This is an open source non-commercial project. Dear PVS-Studio, please check | ||||||
|  | // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com | ||||||
|  |  | ||||||
|  | #include "nvim/api/ui.h" | ||||||
|  | #include "nvim/channel.h" | ||||||
|  | #include "nvim/msgpack_rpc/channel.h" | ||||||
|  |  | ||||||
|  | PMap(uint64_t) *channels = NULL; | ||||||
|  |  | ||||||
|  | #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||||
|  | # include "channel.c.generated.h" | ||||||
|  | #endif | ||||||
|  | /// Teardown the module | ||||||
|  | void channel_teardown(void) | ||||||
|  | { | ||||||
|  |   if (!channels) { | ||||||
|  |     return; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   Channel *channel; | ||||||
|  |  | ||||||
|  |   map_foreach_value(channels, channel, { | ||||||
|  |     (void)channel;  // close_channel(channel); | ||||||
|  |   }); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// Initializes the module | ||||||
|  | void channel_init(void) | ||||||
|  | { | ||||||
|  |   channels = pmap_new(uint64_t)(); | ||||||
|  |   rpc_init(); | ||||||
|  |   remote_ui_init(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void channel_incref(Channel *channel) | ||||||
|  | { | ||||||
|  |   channel->refcount++; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void channel_decref(Channel *channel) | ||||||
|  | { | ||||||
|  |   if (!(--channel->refcount)) { | ||||||
|  |     multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel); | ||||||
|  |   } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static void free_channel_event(void **argv) | ||||||
|  | { | ||||||
|  |   Channel *channel = argv[0]; | ||||||
|  |   if (channel->is_rpc) { | ||||||
|  |     rpc_free(channel); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   callback_free(&channel->on_stdout); | ||||||
|  |   callback_free(&channel->on_stderr); | ||||||
|  |   callback_free(&channel->on_exit); | ||||||
|  |  | ||||||
|  |   pmap_del(uint64_t)(channels, channel->id); | ||||||
|  |   multiqueue_free(channel->events); | ||||||
|  |   xfree(channel); | ||||||
|  | } | ||||||
							
								
								
									
										120
									
								
								src/nvim/channel.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										120
									
								
								src/nvim/channel.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,120 @@ | |||||||
|  | #ifndef NVIM_CHANNEL_H | ||||||
|  | #define NVIM_CHANNEL_H | ||||||
|  |  | ||||||
|  | #include "nvim/main.h" | ||||||
|  | #include "nvim/event/socket.h" | ||||||
|  | #include "nvim/event/process.h" | ||||||
|  | #include "nvim/os/pty_process.h" | ||||||
|  | #include "nvim/event/libuv_process.h" | ||||||
|  | #include "nvim/eval/typval.h" | ||||||
|  | #include "nvim/msgpack_rpc/channel_defs.h" | ||||||
|  |  | ||||||
|  | typedef enum { | ||||||
|  |   kChannelStreamProc, | ||||||
|  |   kChannelStreamSocket, | ||||||
|  |   kChannelStreamStdio, | ||||||
|  |   kChannelStreamInternal | ||||||
|  | } ChannelStreamType; | ||||||
|  |  | ||||||
|  | typedef struct { | ||||||
|  |   Stream in; | ||||||
|  |   Stream out; | ||||||
|  | } StdioPair; | ||||||
|  |  | ||||||
|  | // typedef struct { | ||||||
|  | //   Callback on_out; | ||||||
|  | //   Callback on_close; | ||||||
|  | //   Garray buffer; | ||||||
|  | //   bool buffering; | ||||||
|  | // } CallbackReader | ||||||
|  |  | ||||||
|  | #define CallbackReader Callback | ||||||
|  |  | ||||||
|  | struct Channel { | ||||||
|  |   uint64_t id; | ||||||
|  |   size_t refcount; | ||||||
|  |   MultiQueue *events; | ||||||
|  |  | ||||||
|  |   ChannelStreamType streamtype; | ||||||
|  |   union { | ||||||
|  |     Process proc; | ||||||
|  |     LibuvProcess uv; | ||||||
|  |     PtyProcess pty; | ||||||
|  |     Stream socket; | ||||||
|  |     StdioPair stdio; | ||||||
|  |   } stream; | ||||||
|  |  | ||||||
|  |   bool is_rpc; | ||||||
|  |   RpcState rpc; | ||||||
|  |   Terminal *term; | ||||||
|  |  | ||||||
|  |   CallbackReader on_stdout; | ||||||
|  |   CallbackReader on_stderr; | ||||||
|  |   Callback on_exit; | ||||||
|  |  | ||||||
|  |   varnumber_T *status_ptr; // TODO: refactor? | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | EXTERN PMap(uint64_t) *channels; | ||||||
|  |  | ||||||
|  | #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||||
|  | # include "channel.h.generated.h" | ||||||
|  | #endif | ||||||
|  |  | ||||||
|  | static inline Channel *channel_alloc(ChannelStreamType type) | ||||||
|  | { | ||||||
|  |   Channel *chan = xcalloc(1, sizeof(*chan)); | ||||||
|  |   chan->id = next_chan_id++; | ||||||
|  |   chan->events = multiqueue_new_child(main_loop.events); | ||||||
|  |   chan->refcount = 1; | ||||||
|  |   chan->streamtype = type; | ||||||
|  |   pmap_put(uint64_t)(channels, chan->id, chan); | ||||||
|  |   return chan; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// @returns Channel with the id or NULL if not found | ||||||
|  | static inline Channel *find_channel(uint64_t id) | ||||||
|  | { | ||||||
|  |   return pmap_get(uint64_t)(channels, id); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static inline Stream *channel_instream(Channel *chan) | ||||||
|  |   FUNC_ATTR_NONNULL_ALL | ||||||
|  | { | ||||||
|  |   switch (chan->streamtype) { | ||||||
|  |     case kChannelStreamProc: | ||||||
|  |       return &chan->stream.proc.in; | ||||||
|  |  | ||||||
|  |     case kChannelStreamSocket: | ||||||
|  |       return &chan->stream.socket; | ||||||
|  |  | ||||||
|  |     case kChannelStreamStdio: | ||||||
|  |       return &chan->stream.stdio.in; | ||||||
|  |  | ||||||
|  |     case kChannelStreamInternal: | ||||||
|  |       abort(); | ||||||
|  |   } | ||||||
|  |   abort(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | static inline Stream *channel_outstream(Channel *chan) | ||||||
|  |   FUNC_ATTR_NONNULL_ALL | ||||||
|  | { | ||||||
|  |   switch (chan->streamtype) { | ||||||
|  |     case kChannelStreamProc: | ||||||
|  |       return &chan->stream.proc.out; | ||||||
|  |  | ||||||
|  |     case kChannelStreamSocket: | ||||||
|  |       return &chan->stream.socket; | ||||||
|  |  | ||||||
|  |     case kChannelStreamStdio: | ||||||
|  |       return &chan->stream.stdio.out; | ||||||
|  |  | ||||||
|  |     case kChannelStreamInternal: | ||||||
|  |       abort(); | ||||||
|  |   } | ||||||
|  |   abort(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | #endif  // NVIM_CHANNEL_H | ||||||
							
								
								
									
										314
									
								
								src/nvim/eval.c
									
									
									
									
									
								
							
							
						
						
									
										314
									
								
								src/nvim/eval.c
									
									
									
									
									
								
							| @@ -24,6 +24,7 @@ | |||||||
| #endif | #endif | ||||||
| #include "nvim/eval.h" | #include "nvim/eval.h" | ||||||
| #include "nvim/buffer.h" | #include "nvim/buffer.h" | ||||||
|  | #include "nvim/channel.h" | ||||||
| #include "nvim/charset.h" | #include "nvim/charset.h" | ||||||
| #include "nvim/cursor.h" | #include "nvim/cursor.h" | ||||||
| #include "nvim/diff.h" | #include "nvim/diff.h" | ||||||
| @@ -437,29 +438,12 @@ static ScopeDictDictItem vimvars_var; | |||||||
| #define vimvarht  vimvardict.dv_hashtab | #define vimvarht  vimvardict.dv_hashtab | ||||||
|  |  | ||||||
| typedef struct { | typedef struct { | ||||||
|   union { |   Channel *data; | ||||||
|     LibuvProcess uv; |  | ||||||
|     PtyProcess pty; |  | ||||||
|   } proc; |  | ||||||
|   Stream in, out, err;  // Initialized in common_job_start(). |  | ||||||
|   Terminal *term; |  | ||||||
|   bool stopped; |  | ||||||
|   bool exited; |  | ||||||
|   bool rpc; |  | ||||||
|   int refcount; |  | ||||||
|   Callback on_stdout, on_stderr, on_exit; |  | ||||||
|   varnumber_T *status_ptr; |  | ||||||
|   uint64_t id; |  | ||||||
|   MultiQueue *events; |  | ||||||
| } TerminalJobData; |  | ||||||
|  |  | ||||||
| typedef struct { |  | ||||||
|   TerminalJobData *data; |  | ||||||
|   Callback *callback; |   Callback *callback; | ||||||
|   const char *type; |   const char *type; | ||||||
|   list_T *received; |   list_T *received; | ||||||
|   int status; |   int status; | ||||||
| } JobEvent; | } ChannelEvent; | ||||||
|  |  | ||||||
| typedef struct { | typedef struct { | ||||||
|   TimeWatcher tw; |   TimeWatcher tw; | ||||||
| @@ -513,7 +497,6 @@ typedef enum { | |||||||
| #define FNE_INCL_BR     1       /* find_name_end(): include [] in name */ | #define FNE_INCL_BR     1       /* find_name_end(): include [] in name */ | ||||||
| #define FNE_CHECK_START 2       /* find_name_end(): check name starts with | #define FNE_CHECK_START 2       /* find_name_end(): check name starts with | ||||||
|                                    valid character */ |                                    valid character */ | ||||||
| static PMap(uint64_t) *jobs = NULL; |  | ||||||
|  |  | ||||||
| static uint64_t last_timer_id = 0; | static uint64_t last_timer_id = 0; | ||||||
| static PMap(uint64_t) *timers = NULL; | static PMap(uint64_t) *timers = NULL; | ||||||
| @@ -556,7 +539,6 @@ void eval_init(void) | |||||||
| { | { | ||||||
|   vimvars[VV_VERSION].vv_nr = VIM_VERSION_100; |   vimvars[VV_VERSION].vv_nr = VIM_VERSION_100; | ||||||
|  |  | ||||||
|   jobs = pmap_new(uint64_t)(); |  | ||||||
|   timers = pmap_new(uint64_t)(); |   timers = pmap_new(uint64_t)(); | ||||||
|   struct vimvar   *p; |   struct vimvar   *p; | ||||||
|  |  | ||||||
| @@ -5141,8 +5123,8 @@ bool garbage_collect(bool testing) | |||||||
|  |  | ||||||
|   // Jobs |   // Jobs | ||||||
|   { |   { | ||||||
|     TerminalJobData *data; |     Channel *data; | ||||||
|     map_foreach_value(jobs, data, { |     map_foreach_value(channels, data, { | ||||||
|       set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); |       set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); | ||||||
|       set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); |       set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); | ||||||
|       set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); |       set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); | ||||||
| @@ -11433,24 +11415,23 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); |   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||||
|   if (!data) { |   if (!data) { | ||||||
|     EMSG(_(e_invjob)); |  | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   Process *proc = (Process *)&data->proc; |   Process *proc = (Process *)&data->stream.proc; | ||||||
|  |  | ||||||
|   if (argvars[1].v_type == VAR_STRING) { |   if (argvars[1].v_type == VAR_STRING) { | ||||||
|     char *stream = (char *)argvars[1].vval.v_string; |     char *stream = (char *)argvars[1].vval.v_string; | ||||||
|     if (!strcmp(stream, "stdin")) { |     if (!strcmp(stream, "stdin")) { | ||||||
|       if (data->rpc) { |       if (data->is_rpc) { | ||||||
|         EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); |         EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); | ||||||
|       } else { |       } else { | ||||||
|         process_close_in(proc); |         process_close_in(proc); | ||||||
|       } |       } | ||||||
|     } else if (!strcmp(stream, "stdout")) { |     } else if (!strcmp(stream, "stdout")) { | ||||||
|       if (data->rpc) { |       if (data->is_rpc) { | ||||||
|         EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); |         EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); | ||||||
|       } else { |       } else { | ||||||
|         process_close_out(proc); |         process_close_out(proc); | ||||||
| @@ -11458,7 +11439,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|     } else if (!strcmp(stream, "stderr")) { |     } else if (!strcmp(stream, "stderr")) { | ||||||
|       process_close_err(proc); |       process_close_err(proc); | ||||||
|     } else if (!strcmp(stream, "rpc")) { |     } else if (!strcmp(stream, "rpc")) { | ||||||
|       if (data->rpc) { |       if (data->is_rpc) { | ||||||
|         channel_close(data->id); |         channel_close(data->id); | ||||||
|       } else { |       } else { | ||||||
|         EMSG(_("Invalid job stream: Not an rpc job")); |         EMSG(_("Invalid job stream: Not an rpc job")); | ||||||
| @@ -11467,13 +11448,13 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|       EMSG2(_("Invalid job stream \"%s\""), stream); |       EMSG2(_("Invalid job stream \"%s\""), stream); | ||||||
|     } |     } | ||||||
|   } else { |   } else { | ||||||
|     if (data->rpc) { |     if (data->is_rpc) { | ||||||
|       channel_close(data->id); |       channel_close(data->id); | ||||||
|       process_close_err(proc); |       process_close_err(proc); | ||||||
|     } else { |     } else { | ||||||
|       process_close_streams(proc); |       process_close_streams(proc); | ||||||
|       if (proc->type == kProcessTypePty) { |       if (proc->type == kProcessTypePty) { | ||||||
|         pty_process_close_master(&data->proc.pty); |         pty_process_close_master(&data->stream.pty); | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| @@ -11494,13 +11475,12 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); |   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||||
|   if (!data) { |   if (!data) { | ||||||
|     EMSG(_(e_invjob)); |  | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   Process *proc = (Process *)&data->proc; |   Process *proc = (Process *)&data->stream.proc; | ||||||
|   rettv->vval.v_number = proc->pid; |   rettv->vval.v_number = proc->pid; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -11521,18 +11501,19 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); |   Channel *data = find_channel(argvars[0].vval.v_number); | ||||||
|   if (!data) { |   if (!data) { | ||||||
|     EMSG(_(e_invjob)); |     EMSG(_(e_invchan)); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (((Process *)&data->proc)->in->closed) { |   Stream *in = channel_instream(data); | ||||||
|  |   if (in->closed) { | ||||||
|     EMSG(_("Can't send data to the job: stdin is closed")); |     EMSG(_("Can't send data to the job: stdin is closed")); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (data->rpc) { |   if (data->is_rpc) { | ||||||
|     EMSG(_("Can't send raw data to rpc channel")); |     EMSG(_("Can't send raw data to rpc channel")); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
| @@ -11546,7 +11527,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); |   WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); | ||||||
|   rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf); |   rettv->vval.v_number = wstream_write(in, buf); | ||||||
| } | } | ||||||
|  |  | ||||||
| // "jobresize(job, width, height)" function | // "jobresize(job, width, height)" function | ||||||
| @@ -11567,19 +11548,17 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|  |  | ||||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); |   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||||
|   if (!data) { |   if (!data) { | ||||||
|     EMSG(_(e_invjob)); |  | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (data->proc.uv.process.type != kProcessTypePty) { |   if (data->stream.proc.type != kProcessTypePty) { | ||||||
|     EMSG(_(e_jobnotpty)); |     EMSG(_(e_channotpty)); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, |   pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, argvars[2].vval.v_number); | ||||||
|       argvars[2].vval.v_number); |  | ||||||
|   rettv->vval.v_number = 1; |   rettv->vval.v_number = 1; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -11697,31 +11676,25 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|     } |     } | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, |   Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, | ||||||
|                                           pty, rpc, detach, cwd); |                                           pty, rpc, detach, cwd); | ||||||
|   Process *proc = (Process *)&data->proc; |  | ||||||
|  |  | ||||||
|   if (pty) { |   if (pty) { | ||||||
|  |     PtyProcess *pty = &data->stream.pty; | ||||||
|     uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); |     uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); | ||||||
|     if (width > 0) { |     if (width > 0) { | ||||||
|       data->proc.pty.width = width; |       pty->width = width; | ||||||
|     } |     } | ||||||
|     uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); |     uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); | ||||||
|     if (height > 0) { |     if (height > 0) { | ||||||
|       data->proc.pty.height = height; |       pty->height = height; | ||||||
|     } |     } | ||||||
|     char *term = tv_dict_get_string(job_opts, "TERM", true); |     char *term = tv_dict_get_string(job_opts, "TERM", true); | ||||||
|     if (term) { |     if (term) { | ||||||
|       data->proc.pty.term_name = term; |       pty->term_name = term; | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (!rpc && on_stdout.type == kCallbackNone) { |  | ||||||
|     proc->out = NULL; |  | ||||||
|   } |  | ||||||
|   if (on_stderr.type == kCallbackNone) { |  | ||||||
|     proc->err = NULL; |  | ||||||
|   } |  | ||||||
|   common_job_start(data, rettv); |   common_job_start(data, rettv); | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -11742,14 +11715,12 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|  |  | ||||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); |   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||||
|   if (!data) { |   if (!data) { | ||||||
|     EMSG(_(e_invjob)); |  | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   process_stop((Process *)&data->proc); |   process_stop((Process *)&data->stream.proc); | ||||||
|   data->stopped = true; |  | ||||||
|   rettv->vval.v_number = 1; |   rettv->vval.v_number = 1; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -11778,9 +11749,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   // is used to represent an invalid job id, -2 is for a interrupted job and |   // is used to represent an invalid job id, -2 is for a interrupted job and | ||||||
|   // -1 for jobs that were skipped or timed out. |   // -1 for jobs that were skipped or timed out. | ||||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { |   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||||
|     TerminalJobData *data = NULL; |     Channel *data = NULL; | ||||||
|     if (arg->li_tv.v_type != VAR_NUMBER |     if (arg->li_tv.v_type != VAR_NUMBER | ||||||
|         || !(data = find_job(arg->li_tv.vval.v_number))) { |         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||||
|       tv_list_append_number(rv, -3); |       tv_list_append_number(rv, -3); | ||||||
|     } else { |     } else { | ||||||
|       // append the list item and set the status pointer so we'll collect the |       // append the list item and set the status pointer so we'll collect the | ||||||
| @@ -11802,16 +11773,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { |   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||||
|     TerminalJobData *data = NULL; |     Channel *data = NULL; | ||||||
|     if (remaining == 0) { |     if (remaining == 0) { | ||||||
|       // timed out |       // timed out | ||||||
|       break; |       break; | ||||||
|     } |     } | ||||||
|     if (arg->li_tv.v_type != VAR_NUMBER |     if (arg->li_tv.v_type != VAR_NUMBER | ||||||
|         || !(data = find_job(arg->li_tv.vval.v_number))) { |         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||||
|       continue; |       continue; | ||||||
|     } |     } | ||||||
|     int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); |     int status = process_wait((Process *)&data->stream.proc, remaining, waiting_jobs); | ||||||
|     if (status < 0) { |     if (status < 0) { | ||||||
|       // interrupted or timed out, skip remaining jobs. |       // interrupted or timed out, skip remaining jobs. | ||||||
|       if (status == -2) { |       if (status == -2) { | ||||||
| @@ -11832,9 +11803,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { |   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||||
|     TerminalJobData *data = NULL; |     Channel *data = NULL; | ||||||
|     if (arg->li_tv.v_type != VAR_NUMBER |     if (arg->li_tv.v_type != VAR_NUMBER | ||||||
|         || !(data = find_job(arg->li_tv.vval.v_number))) { |         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||||
|       continue; |       continue; | ||||||
|     } |     } | ||||||
|     // remove the status pointer because the list may be freed before the |     // remove the status pointer because the list may be freed before the | ||||||
| @@ -11844,9 +11815,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|  |  | ||||||
|   // restore the parent queue for any jobs still alive |   // restore the parent queue for any jobs still alive | ||||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { |   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||||
|     TerminalJobData *data = NULL; |     Channel *data = NULL; | ||||||
|     if (arg->li_tv.v_type != VAR_NUMBER |     if (arg->li_tv.v_type != VAR_NUMBER | ||||||
|         || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { |         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||||
|       continue; |       continue; | ||||||
|     } |     } | ||||||
|     // restore the parent queue for the job |     // restore the parent queue for the job | ||||||
| @@ -13803,9 +13774,8 @@ static void f_rpcnotify(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|     ADD(args, vim_to_object(tv)); |     ADD(args, vim_to_object(tv)); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (!channel_send_event((uint64_t)argvars[0].vval.v_number, |   if (!rpc_send_event((uint64_t)argvars[0].vval.v_number, | ||||||
|                           tv_get_string(&argvars[1]), |                       tv_get_string(&argvars[1]), args)) { | ||||||
|                           args)) { |  | ||||||
|     EMSG2(_(e_invarg2), "Channel doesn't exist"); |     EMSG2(_(e_invarg2), "Channel doesn't exist"); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
| @@ -13870,10 +13840,8 @@ static void f_rpcrequest(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|  |  | ||||||
|  |  | ||||||
|   Error err = ERROR_INIT; |   Error err = ERROR_INIT; | ||||||
|   Object result = channel_send_call((uint64_t)argvars[0].vval.v_number, |   Object result = rpc_send_call((uint64_t)argvars[0].vval.v_number, | ||||||
|                                     tv_get_string(&argvars[1]), |                                 tv_get_string(&argvars[1]), args, &err); | ||||||
|                                     args, |  | ||||||
|                                     &err); |  | ||||||
|  |  | ||||||
|   if (l_provider_call_nesting) { |   if (l_provider_call_nesting) { | ||||||
|     current_SID = save_current_SID; |     current_SID = save_current_SID; | ||||||
| @@ -13954,7 +13922,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   // The last item of argv must be NULL |   // The last item of argv must be NULL | ||||||
|   argv[i] = NULL; |   argv[i] = NULL; | ||||||
|  |  | ||||||
|   TerminalJobData *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, |   Channel *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, | ||||||
|                                           CALLBACK_NONE, false, true, false, |                                           CALLBACK_NONE, false, true, false, | ||||||
|                                           NULL); |                                           NULL); | ||||||
|   common_job_start(data, rettv); |   common_job_start(data, rettv); | ||||||
| @@ -13977,10 +13945,11 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   // if called with a job, stop it, else closes the channel |   // if called with a job, stop it, else closes the channel | ||||||
|   if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { |   uint64_t id = argvars[0].vval.v_number; | ||||||
|  |   if (find_job(id, false)) { // FIXME | ||||||
|     f_jobstop(argvars, rettv, NULL); |     f_jobstop(argvars, rettv, NULL); | ||||||
|   } else { |   } else { | ||||||
|     rettv->vval.v_number = channel_close(argvars[0].vval.v_number); |     rettv->vval.v_number = channel_close(id); | ||||||
|   } |   } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -16689,11 +16658,11 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); |   uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); | ||||||
|   TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, |   Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, | ||||||
|                                           true, false, false, cwd); |                                           true, false, false, cwd); | ||||||
|   data->proc.pty.width = term_width; |   data->stream.pty.width = term_width; | ||||||
|   data->proc.pty.height = curwin->w_height; |   data->stream.pty.height = curwin->w_height; | ||||||
|   data->proc.pty.term_name = xstrdup("xterm-256color"); |   data->stream.pty.term_name = xstrdup("xterm-256color"); | ||||||
|   if (!common_job_start(data, rettv)) { |   if (!common_job_start(data, rettv)) { | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
| @@ -16705,7 +16674,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|   topts.resize_cb = term_resize; |   topts.resize_cb = term_resize; | ||||||
|   topts.close_cb = term_close; |   topts.close_cb = term_close; | ||||||
|  |  | ||||||
|   int pid = data->proc.pty.process.pid; |   int pid = data->stream.pty.process.pid; | ||||||
|  |  | ||||||
|   char buf[1024]; |   char buf[1024]; | ||||||
|   // format the title with the pid to conform with the term:// URI |   // format the title with the pid to conform with the term:// URI | ||||||
| @@ -16725,7 +16694,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) | |||||||
|  |  | ||||||
|   Terminal *term = terminal_open(topts); |   Terminal *term = terminal_open(topts); | ||||||
|   data->term = term; |   data->term = term; | ||||||
|   data->refcount++; |   channel_incref(data); | ||||||
|  |  | ||||||
|   return; |   return; | ||||||
| } | } | ||||||
| @@ -16760,30 +16729,6 @@ bool callback_from_typval(Callback *const callback, typval_T *const arg) | |||||||
|   return true; |   return true; | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Unref/free callback |  | ||||||
| void callback_free(Callback *const callback) |  | ||||||
|   FUNC_ATTR_NONNULL_ALL |  | ||||||
| { |  | ||||||
|   switch (callback->type) { |  | ||||||
|     case kCallbackFuncref: { |  | ||||||
|       func_unref(callback->data.funcref); |  | ||||||
|       xfree(callback->data.funcref); |  | ||||||
|       break; |  | ||||||
|     } |  | ||||||
|     case kCallbackPartial: { |  | ||||||
|       partial_unref(callback->data.partial); |  | ||||||
|       break; |  | ||||||
|     } |  | ||||||
|     case kCallbackNone: { |  | ||||||
|       break; |  | ||||||
|     } |  | ||||||
|     default: { |  | ||||||
|       abort(); |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
|   callback->type = kCallbackNone; |  | ||||||
| } |  | ||||||
|  |  | ||||||
| bool callback_call(Callback *const callback, const int argcount_in, | bool callback_call(Callback *const callback, const int argcount_in, | ||||||
|                    typval_T *const argvars_in, typval_T *const rettv) |                    typval_T *const argvars_in, typval_T *const rettv) | ||||||
|   FUNC_ATTR_NONNULL_ALL |   FUNC_ATTR_NONNULL_ALL | ||||||
| @@ -22402,7 +22347,7 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, | |||||||
|   return ret; |   return ret; | ||||||
| } | } | ||||||
|  |  | ||||||
| static inline TerminalJobData *common_job_init(char **argv, | static inline Channel *common_job_init(char **argv, | ||||||
|                                                Callback on_stdout, |                                                Callback on_stdout, | ||||||
|                                                Callback on_stderr, |                                                Callback on_stderr, | ||||||
|                                                Callback on_exit, |                                                Callback on_exit, | ||||||
| @@ -22411,25 +22356,18 @@ static inline TerminalJobData *common_job_init(char **argv, | |||||||
|                                                bool detach, |                                                bool detach, | ||||||
|                                                const char *cwd) |                                                const char *cwd) | ||||||
| { | { | ||||||
|   TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); |   Channel *data = channel_alloc(kChannelStreamProc); | ||||||
|   data->stopped = false; |  | ||||||
|   data->on_stdout = on_stdout; |   data->on_stdout = on_stdout; | ||||||
|   data->on_stderr = on_stderr; |   data->on_stderr = on_stderr; | ||||||
|   data->on_exit = on_exit; |   data->on_exit = on_exit; | ||||||
|   data->events = multiqueue_new_child(main_loop.events); |   data->is_rpc = rpc; | ||||||
|   data->rpc = rpc; |  | ||||||
|   if (pty) { |   if (pty) { | ||||||
|     data->proc.pty = pty_process_init(&main_loop, data); |     data->stream.pty = pty_process_init(&main_loop, data); | ||||||
|   } else { |   } else { | ||||||
|     data->proc.uv = libuv_process_init(&main_loop, data); |     data->stream.uv = libuv_process_init(&main_loop, data); | ||||||
|   } |   } | ||||||
|   Process *proc = (Process *)&data->proc; |   Process *proc = (Process *)&data->stream.proc; | ||||||
|   proc->argv = argv; |   proc->argv = argv; | ||||||
|   proc->in = &data->in; |  | ||||||
|   proc->out = &data->out; |  | ||||||
|   if (!pty) { |  | ||||||
|     proc->err = &data->err; |  | ||||||
|   } |  | ||||||
|   proc->cb = eval_job_process_exit_cb; |   proc->cb = eval_job_process_exit_cb; | ||||||
|   proc->events = data->events; |   proc->events = data->events; | ||||||
|   proc->detach = detach; |   proc->detach = detach; | ||||||
| @@ -22456,80 +22394,66 @@ static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, | |||||||
|   return false; |   return false; | ||||||
| } | } | ||||||
|  |  | ||||||
| static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) | static inline bool common_job_start(Channel *data, typval_T *rettv) | ||||||
| { | { | ||||||
|   Process *proc = (Process *)&data->proc; |   Process *proc = (Process *)&data->stream.proc; | ||||||
|   if (proc->type == kProcessTypePty && proc->detach) { |   if (proc->type == kProcessTypePty && proc->detach) { | ||||||
|     EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); |     EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); | ||||||
|     xfree(data->proc.pty.term_name); |     xfree(data->stream.pty.term_name); | ||||||
|     shell_free_argv(proc->argv); |     shell_free_argv(proc->argv); | ||||||
|     free_term_job_data_event((void **)&data); |     channel_decref(data); | ||||||
|     return false; |     return false; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   data->id = next_chan_id++; |  | ||||||
|   pmap_put(uint64_t)(jobs, data->id, data); |  | ||||||
|  |  | ||||||
|   data->refcount++; |   data->refcount++; | ||||||
|   char *cmd = xstrdup(proc->argv[0]); |   char *cmd = xstrdup(proc->argv[0]); | ||||||
|   int status = process_spawn(proc); |   bool has_out, has_err; | ||||||
|  |   if (proc->type == kProcessTypePty) { | ||||||
|  |     has_out = true; | ||||||
|  |     has_err = false; | ||||||
|  |   } else { | ||||||
|  |     has_out = data->is_rpc || data->on_stdout.type != kCallbackNone; | ||||||
|  |     has_err = data->on_stderr.type != kCallbackNone; | ||||||
|  |   } | ||||||
|  |   int status = process_spawn(proc, true, has_out, has_err); | ||||||
|   if (status) { |   if (status) { | ||||||
|     EMSG3(_(e_jobspawn), os_strerror(status), cmd); |     EMSG3(_(e_jobspawn), os_strerror(status), cmd); | ||||||
|     xfree(cmd); |     xfree(cmd); | ||||||
|     if (proc->type == kProcessTypePty) { |     if (proc->type == kProcessTypePty) { | ||||||
|       xfree(data->proc.pty.term_name); |       xfree(data->stream.pty.term_name); | ||||||
|     } |     } | ||||||
|     rettv->vval.v_number = proc->status; |     rettv->vval.v_number = proc->status; | ||||||
|     term_job_data_decref(data); |     channel_decref(data); | ||||||
|     return false; |     return false; | ||||||
|   } |   } | ||||||
|   xfree(cmd); |   xfree(cmd); | ||||||
|  |  | ||||||
|  |  | ||||||
|   if (data->rpc) { |   if (data->is_rpc) { | ||||||
|     // the rpc channel takes over the in and out streams |     // the rpc takes over the in and out streams | ||||||
|     channel_from_process(proc, data->id); |     rpc_start(data); | ||||||
|   } else { |   } else { | ||||||
|     wstream_init(proc->in, 0); |     wstream_init(&proc->in, 0); | ||||||
|     if (proc->out) { |     if (has_out) { | ||||||
|       rstream_init(proc->out, 0); |       rstream_init(&proc->out, 0); | ||||||
|       rstream_start(proc->out, on_job_stdout, data); |       rstream_start(&proc->out, on_job_stdout, data); | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->err) { |   if (has_err) { | ||||||
|     rstream_init(proc->err, 0); |     rstream_init(&proc->err, 0); | ||||||
|     rstream_start(proc->err, on_job_stderr, data); |     rstream_start(&proc->err, on_job_stderr, data); | ||||||
|   } |   } | ||||||
|   rettv->vval.v_number = data->id; |   rettv->vval.v_number = data->id; | ||||||
|   return true; |   return true; | ||||||
| } | } | ||||||
|  |  | ||||||
| static inline void free_term_job_data_event(void **argv) |  | ||||||
| { |  | ||||||
|   TerminalJobData *data = argv[0]; |  | ||||||
|   callback_free(&data->on_stdout); |  | ||||||
|   callback_free(&data->on_stderr); |  | ||||||
|   callback_free(&data->on_exit); |  | ||||||
|  |  | ||||||
|   multiqueue_free(data->events); |  | ||||||
|   pmap_del(uint64_t)(jobs, data->id); |  | ||||||
|   xfree(data); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| static inline void free_term_job_data(TerminalJobData *data) |  | ||||||
| { |  | ||||||
|   // data->queue may still be used after this function returns(process_wait), so |  | ||||||
|   // only free in the next event loop iteration |  | ||||||
|   multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // vimscript job callbacks must be executed on Nvim main loop | // vimscript job callbacks must be executed on Nvim main loop | ||||||
| static inline void process_job_event(TerminalJobData *data, Callback *callback, | static inline void process_job_event(Channel *data, Callback *callback, | ||||||
|                                      const char *type, char *buf, size_t count, |                                      const char *type, char *buf, size_t count, | ||||||
|                                      int status) |                                      int status) | ||||||
| { | { | ||||||
|   JobEvent event_data; |   ChannelEvent event_data; | ||||||
|   event_data.received = NULL; |   event_data.received = NULL; | ||||||
|   if (buf) { |   if (buf) { | ||||||
|     event_data.received = tv_list_alloc(); |     event_data.received = tv_list_alloc(); | ||||||
| @@ -22566,18 +22490,18 @@ static inline void process_job_event(TerminalJobData *data, Callback *callback, | |||||||
| static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, | static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, | ||||||
|     void *job, bool eof) |     void *job, bool eof) | ||||||
| { | { | ||||||
|   TerminalJobData *data = job; |   Channel *data = job; | ||||||
|   on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); |   on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, | static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, | ||||||
|     void *job, bool eof) |     void *job, bool eof) | ||||||
| { | { | ||||||
|   TerminalJobData *data = job; |   Channel *data = job; | ||||||
|   on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); |   on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, | static void on_job_output(Stream *stream, Channel *data, RBuffer *buf, | ||||||
|                           size_t count, bool eof, Callback *callback, |                           size_t count, bool eof, Callback *callback, | ||||||
|                           const char *type) |                           const char *type) | ||||||
| { | { | ||||||
| @@ -22604,14 +22528,14 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, | |||||||
|  |  | ||||||
| static void eval_job_process_exit_cb(Process *proc, int status, void *d) | static void eval_job_process_exit_cb(Process *proc, int status, void *d) | ||||||
| { | { | ||||||
|   TerminalJobData *data = d; |   Channel *data = d; | ||||||
|   if (data->term && !data->exited) { |   if (data->term && !data->stream.proc.exited) { | ||||||
|     data->exited = true; |     data->stream.proc.exited = true; | ||||||
|     char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; |     char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; | ||||||
|     snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); |     snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); | ||||||
|     terminal_close(data->term, msg); |     terminal_close(data->term, msg); | ||||||
|   } |   } | ||||||
|   if (data->rpc) { |   if (data->is_rpc) { | ||||||
|     channel_process_exit(data->id, status); |     channel_process_exit(data->id, status); | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -22621,58 +22545,51 @@ static void eval_job_process_exit_cb(Process *proc, int status, void *d) | |||||||
|  |  | ||||||
|   process_job_event(data, &data->on_exit, "exit", NULL, 0, status); |   process_job_event(data, &data->on_exit, "exit", NULL, 0, status); | ||||||
|  |  | ||||||
|   term_job_data_decref(data); |   channel_decref(data); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void term_write(char *buf, size_t size, void *d) | static void term_write(char *buf, size_t size, void *d) | ||||||
| { | { | ||||||
|   TerminalJobData *job = d; |   Channel *job = d; | ||||||
|   if (job->in.closed) { |   if (job->stream.proc.in.closed) { | ||||||
|     // If the backing stream was closed abruptly, there may be write events |     // If the backing stream was closed abruptly, there may be write events | ||||||
|     // ahead of the terminal close event. Just ignore the writes. |     // ahead of the terminal close event. Just ignore the writes. | ||||||
|     ILOG("write failed: stream is closed"); |     ILOG("write failed: stream is closed"); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|   WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); |   WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); | ||||||
|   wstream_write(&job->in, wbuf); |   wstream_write(&job->stream.proc.in, wbuf); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void term_resize(uint16_t width, uint16_t height, void *d) | static void term_resize(uint16_t width, uint16_t height, void *d) | ||||||
| { | { | ||||||
|   TerminalJobData *data = d; |   Channel *data = d; | ||||||
|   pty_process_resize(&data->proc.pty, width, height); |   pty_process_resize(&data->stream.pty, width, height); | ||||||
| } | } | ||||||
|  |  | ||||||
| static inline void term_delayed_free(void **argv) | static inline void term_delayed_free(void **argv) | ||||||
| { | { | ||||||
|   TerminalJobData *j = argv[0]; |   Channel *j = argv[0]; | ||||||
|   if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) { |   if (j->stream.proc.in.pending_reqs || j->stream.proc.out.pending_reqs) { | ||||||
|     multiqueue_put(j->events, term_delayed_free, 1, j); |     multiqueue_put(j->events, term_delayed_free, 1, j); | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   terminal_destroy(j->term); |   terminal_destroy(j->term); | ||||||
|   term_job_data_decref(j); |   channel_decref(j); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void term_close(void *d) | static void term_close(void *d) | ||||||
| { | { | ||||||
|   TerminalJobData *data = d; |   Channel *data = d; | ||||||
|   if (!data->exited) { |   if (!data->stream.proc.exited) { | ||||||
|     data->exited = true; |     data->stream.proc.exited = true; | ||||||
|     process_stop((Process *)&data->proc); |     process_stop((Process *)&data->stream.proc); | ||||||
|   } |   } | ||||||
|   multiqueue_put(data->events, term_delayed_free, 1, data); |   multiqueue_put(data->events, term_delayed_free, 1, data); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void term_job_data_decref(TerminalJobData *data) | static void on_job_event(ChannelEvent *ev) | ||||||
| { |  | ||||||
|   if (!(--data->refcount)) { |  | ||||||
|     free_term_job_data(data); |  | ||||||
|   } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| static void on_job_event(JobEvent *ev) |  | ||||||
| { | { | ||||||
|   if (!ev->callback) { |   if (!ev->callback) { | ||||||
|     return; |     return; | ||||||
| @@ -22704,15 +22621,24 @@ static void on_job_event(JobEvent *ev) | |||||||
|   tv_clear(&rettv); |   tv_clear(&rettv); | ||||||
| } | } | ||||||
|  |  | ||||||
| static TerminalJobData *find_job(uint64_t id) | static Channel *find_job(uint64_t id, bool show_error) | ||||||
| { | { | ||||||
|   TerminalJobData *data = pmap_get(uint64_t)(jobs, id); |   Channel *data = find_channel(id); | ||||||
|   if (!data || data->stopped) { |   if (!data || data->streamtype != kChannelStreamProc | ||||||
|  |       || process_is_stopped(&data->stream.proc)) { | ||||||
|  |     if (show_error) { | ||||||
|  |       if (data && data->streamtype != kChannelStreamProc) { | ||||||
|  |         EMSG(_(e_invchanjob)); | ||||||
|  |       } else { | ||||||
|  |         EMSG(_(e_invchan)); | ||||||
|  |       } | ||||||
|  |     } | ||||||
|     return NULL; |     return NULL; | ||||||
|   } |   } | ||||||
|   return data; |   return data; | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) | static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) | ||||||
| { | { | ||||||
|   if (check_restricted() || check_secure()) { |   if (check_restricted() || check_secure()) { | ||||||
|   | |||||||
| @@ -847,6 +847,30 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2) | |||||||
|   return false; |   return false; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /// Unref/free callback | ||||||
|  | void callback_free(Callback *const callback) | ||||||
|  |   FUNC_ATTR_NONNULL_ALL | ||||||
|  | { | ||||||
|  |   switch (callback->type) { | ||||||
|  |     case kCallbackFuncref: { | ||||||
|  |       func_unref(callback->data.funcref); | ||||||
|  |       xfree(callback->data.funcref); | ||||||
|  |       break; | ||||||
|  |     } | ||||||
|  |     case kCallbackPartial: { | ||||||
|  |       partial_unref(callback->data.partial); | ||||||
|  |       break; | ||||||
|  |     } | ||||||
|  |     case kCallbackNone: { | ||||||
|  |       break; | ||||||
|  |     } | ||||||
|  |     default: { | ||||||
|  |       abort(); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   callback->type = kCallbackNone; | ||||||
|  | } | ||||||
|  |  | ||||||
| /// Remove watcher from a dictionary | /// Remove watcher from a dictionary | ||||||
| /// | /// | ||||||
| /// @param  dict  Dictionary to remove watcher from. | /// @param  dict  Dictionary to remove watcher from. | ||||||
|   | |||||||
| @@ -46,22 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc) | |||||||
|   uvproc->uvstdio[2].flags = UV_IGNORE; |   uvproc->uvstdio[2].flags = UV_IGNORE; | ||||||
|   uvproc->uv.data = proc; |   uvproc->uv.data = proc; | ||||||
|  |  | ||||||
|   if (proc->in) { |   if (!proc->in.closed) { | ||||||
|     uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; |     uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; | ||||||
|     uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t, |     uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t, | ||||||
|                                                  &proc->in->uv.pipe); |                                                  &proc->in.uv.pipe); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->out) { |   if (!proc->out.closed) { | ||||||
|     uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; |     uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; | ||||||
|     uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t, |     uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t, | ||||||
|                                                  &proc->out->uv.pipe); |                                                  &proc->out.uv.pipe); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->err) { |   if (!proc->err.closed) { | ||||||
|     uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; |     uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; | ||||||
|     uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t, |     uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t, | ||||||
|                                                  &proc->err->uv.pipe); |                                                  &proc->err.uv.pipe); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   int status; |   int status; | ||||||
|   | |||||||
| @@ -27,26 +27,33 @@ | |||||||
|  |  | ||||||
| #define CLOSE_PROC_STREAM(proc, stream) \ | #define CLOSE_PROC_STREAM(proc, stream) \ | ||||||
|   do { \ |   do { \ | ||||||
|     if (proc->stream && !proc->stream->closed) { \ |     if (!proc->stream.closed) { \ | ||||||
|       stream_close(proc->stream, NULL, NULL); \ |       stream_close(&proc->stream, NULL, NULL); \ | ||||||
|     } \ |     } \ | ||||||
|   } while (0) |   } while (0) | ||||||
|  |  | ||||||
| static bool process_is_tearing_down = false; | static bool process_is_tearing_down = false; | ||||||
|  |  | ||||||
| /// @returns zero on success, or negative error code | /// @returns zero on success, or negative error code | ||||||
| int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL | int process_spawn(Process *proc, bool in, bool out, bool err) | ||||||
|  |   FUNC_ATTR_NONNULL_ALL | ||||||
| { | { | ||||||
|   if (proc->in) { |   if (in) { | ||||||
|     uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0); |     uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0); | ||||||
|  |   } else { | ||||||
|  |     proc->in.closed = true; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->out) { |   if (out) { | ||||||
|     uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0); |     uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); | ||||||
|  |   } else { | ||||||
|  |     proc->out.closed = true; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->err) { |   if (err) { | ||||||
|     uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0); |     uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); | ||||||
|  |   } else { | ||||||
|  |     proc->err.closed = true; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   int status; |   int status; | ||||||
| @@ -62,14 +69,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (status) { |   if (status) { | ||||||
|     if (proc->in) { |     if (in) { | ||||||
|       uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL); |       uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); | ||||||
|     } |     } | ||||||
|     if (proc->out) { |     if (out) { | ||||||
|       uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL); |       uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); | ||||||
|     } |     } | ||||||
|     if (proc->err) { |     if (err) { | ||||||
|       uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL); |       uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     if (proc->type == kProcessTypeUv) { |     if (proc->type == kProcessTypeUv) { | ||||||
| @@ -82,30 +89,30 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL | |||||||
|     return status; |     return status; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->in) { |   if (in) { | ||||||
|     stream_init(NULL, proc->in, -1, |     stream_init(NULL, &proc->in, -1, | ||||||
|                 STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe)); |                 STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe)); | ||||||
|     proc->in->events = proc->events; |     proc->in.events = proc->events; | ||||||
|     proc->in->internal_data = proc; |     proc->in.internal_data = proc; | ||||||
|     proc->in->internal_close_cb = on_process_stream_close; |     proc->in.internal_close_cb = on_process_stream_close; | ||||||
|     proc->refcount++; |     proc->refcount++; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->out) { |   if (out) { | ||||||
|     stream_init(NULL, proc->out, -1, |     stream_init(NULL, &proc->out, -1, | ||||||
|                 STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe)); |                 STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe)); | ||||||
|     proc->out->events = proc->events; |     proc->out.events = proc->events; | ||||||
|     proc->out->internal_data = proc; |     proc->out.internal_data = proc; | ||||||
|     proc->out->internal_close_cb = on_process_stream_close; |     proc->out.internal_close_cb = on_process_stream_close; | ||||||
|     proc->refcount++; |     proc->refcount++; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->err) { |   if (err) { | ||||||
|     stream_init(NULL, proc->err, -1, |     stream_init(NULL, &proc->err, -1, | ||||||
|                 STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe)); |                 STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe)); | ||||||
|     proc->err->events = proc->events; |     proc->err.events = proc->events; | ||||||
|     proc->err->internal_data = proc; |     proc->err.internal_data = proc; | ||||||
|     proc->err->internal_close_cb = on_process_stream_close; |     proc->err.internal_close_cb = on_process_stream_close; | ||||||
|     proc->refcount++; |     proc->refcount++; | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -395,8 +402,8 @@ static void process_close_handles(void **argv) | |||||||
| { | { | ||||||
|   Process *proc = argv[0]; |   Process *proc = argv[0]; | ||||||
|  |  | ||||||
|   flush_stream(proc, proc->out); |   flush_stream(proc, &proc->out); | ||||||
|   flush_stream(proc, proc->err); |   flush_stream(proc, &proc->err); | ||||||
|  |  | ||||||
|   process_close_streams(proc); |   process_close_streams(proc); | ||||||
|   process_close(proc); |   process_close(proc); | ||||||
|   | |||||||
| @@ -23,13 +23,15 @@ struct process { | |||||||
|   uint64_t stopped_time; |   uint64_t stopped_time; | ||||||
|   const char *cwd; |   const char *cwd; | ||||||
|   char **argv; |   char **argv; | ||||||
|   Stream *in, *out, *err; |   Stream in, out, err; | ||||||
|   process_exit_cb cb; |   process_exit_cb cb; | ||||||
|   internal_process_cb internal_exit_cb, internal_close_cb; |   internal_process_cb internal_exit_cb, internal_close_cb; | ||||||
|  |   bool exited; // TODO: redundant | ||||||
|   bool closed, detach; |   bool closed, detach; | ||||||
|   MultiQueue *events; |   MultiQueue *events; | ||||||
| }; | }; | ||||||
|  |  | ||||||
|  |  | ||||||
| static inline Process process_init(Loop *loop, ProcessType type, void *data) | static inline Process process_init(Loop *loop, ProcessType type, void *data) | ||||||
| { | { | ||||||
|   return (Process) { |   return (Process) { | ||||||
| @@ -43,9 +45,9 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) | |||||||
|     .stopped_time = 0, |     .stopped_time = 0, | ||||||
|     .cwd = NULL, |     .cwd = NULL, | ||||||
|     .argv = NULL, |     .argv = NULL, | ||||||
|     .in = NULL, |     .in = { .closed = false }, | ||||||
|     .out = NULL, |     .out = { .closed = false }, | ||||||
|     .err = NULL, |     .err = { .closed = false }, | ||||||
|     .cb = NULL, |     .cb = NULL, | ||||||
|     .closed = false, |     .closed = false, | ||||||
|     .internal_close_cb = NULL, |     .internal_close_cb = NULL, | ||||||
| @@ -54,6 +56,11 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) | |||||||
|   }; |   }; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static inline bool process_is_stopped(Process *proc) | ||||||
|  | { | ||||||
|  |   return proc->stopped_time != 0; | ||||||
|  | } | ||||||
|  |  | ||||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||||
| # include "event/process.h.generated.h" | # include "event/process.h.generated.h" | ||||||
| #endif | #endif | ||||||
|   | |||||||
| @@ -33,6 +33,7 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status); | |||||||
| typedef void (*stream_close_cb)(Stream *stream, void *data); | typedef void (*stream_close_cb)(Stream *stream, void *data); | ||||||
|  |  | ||||||
| struct stream { | struct stream { | ||||||
|  |   bool closed; | ||||||
|   union { |   union { | ||||||
|     uv_pipe_t pipe; |     uv_pipe_t pipe; | ||||||
|     uv_tcp_t tcp; |     uv_tcp_t tcp; | ||||||
| @@ -52,7 +53,6 @@ struct stream { | |||||||
|   size_t maxmem; |   size_t maxmem; | ||||||
|   size_t pending_reqs; |   size_t pending_reqs; | ||||||
|   size_t num_bytes; |   size_t num_bytes; | ||||||
|   bool closed; |  | ||||||
|   MultiQueue *events; |   MultiQueue *events; | ||||||
| }; | }; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1074,11 +1074,12 @@ EXTERN char_u e_invexpr2[] INIT(= N_("E15: Invalid expression: %s")); | |||||||
| EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range")); | EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range")); | ||||||
| EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command")); | EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command")); | ||||||
| EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory")); | EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory")); | ||||||
| EXTERN char_u e_invjob[] INIT(= N_("E900: Invalid job id")); | EXTERN char_u e_invchan[] INIT(= N_("E900: Invalid channel id")); | ||||||
|  | EXTERN char_u e_invchanjob[] INIT(= N_("E900: Invalid channel id: not a job")); | ||||||
| EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full")); | EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full")); | ||||||
| EXTERN char_u e_jobspawn[] INIT(= N_( | EXTERN char_u e_jobspawn[] INIT(= N_( | ||||||
|         "E903: Process failed to start: %s: \"%s\"")); |     "E903: Process failed to start: %s: \"%s\"")); | ||||||
| EXTERN char_u e_jobnotpty[] INIT(= N_("E904: Job is not connected to a pty")); | EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty")); | ||||||
| EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\"")); | EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\"")); | ||||||
| EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s")); | EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s")); | ||||||
| EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number")); | EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number")); | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ | |||||||
| #include "nvim/api/private/helpers.h" | #include "nvim/api/private/helpers.h" | ||||||
| #include "nvim/api/vim.h" | #include "nvim/api/vim.h" | ||||||
| #include "nvim/api/ui.h" | #include "nvim/api/ui.h" | ||||||
|  | #include "nvim/channel.h" | ||||||
| #include "nvim/msgpack_rpc/channel.h" | #include "nvim/msgpack_rpc/channel.h" | ||||||
| #include "nvim/msgpack_rpc/server.h" | #include "nvim/msgpack_rpc/server.h" | ||||||
| #include "nvim/event/loop.h" | #include "nvim/event/loop.h" | ||||||
| @@ -40,47 +41,6 @@ | |||||||
| #define log_server_msg(...) | #define log_server_msg(...) | ||||||
| #endif | #endif | ||||||
|  |  | ||||||
| typedef enum { |  | ||||||
|   kChannelTypeSocket, |  | ||||||
|   kChannelTypeProc, |  | ||||||
|   kChannelTypeStdio, |  | ||||||
|   kChannelTypeInternal |  | ||||||
| } ChannelType; |  | ||||||
|  |  | ||||||
| typedef struct { |  | ||||||
|   uint64_t request_id; |  | ||||||
|   bool returned, errored; |  | ||||||
|   Object result; |  | ||||||
| } ChannelCallFrame; |  | ||||||
|  |  | ||||||
| typedef struct { |  | ||||||
|   uint64_t id; |  | ||||||
|   size_t refcount; |  | ||||||
|   PMap(cstr_t) *subscribed_events; |  | ||||||
|   bool closed; |  | ||||||
|   ChannelType type; |  | ||||||
|   msgpack_unpacker *unpacker; |  | ||||||
|   union { |  | ||||||
|     Stream stream;  // bidirectional (socket) |  | ||||||
|     Process *proc; |  | ||||||
|     struct { |  | ||||||
|       Stream in; |  | ||||||
|       Stream out; |  | ||||||
|     } std; |  | ||||||
|   } data; |  | ||||||
|   uint64_t next_request_id; |  | ||||||
|   kvec_t(ChannelCallFrame *) call_stack; |  | ||||||
|   MultiQueue *events; |  | ||||||
| } Channel; |  | ||||||
|  |  | ||||||
| typedef struct { |  | ||||||
|   Channel *channel; |  | ||||||
|   MsgpackRpcRequestHandler handler; |  | ||||||
|   Array args; |  | ||||||
|   uint64_t request_id; |  | ||||||
| } RequestEvent; |  | ||||||
|  |  | ||||||
| static PMap(uint64_t) *channels = NULL; |  | ||||||
| static PMap(cstr_t) *event_strings = NULL; | static PMap(cstr_t) *event_strings = NULL; | ||||||
| static msgpack_sbuffer out_buffer; | static msgpack_sbuffer out_buffer; | ||||||
|  |  | ||||||
| @@ -88,50 +48,32 @@ static msgpack_sbuffer out_buffer; | |||||||
| # include "msgpack_rpc/channel.c.generated.h" | # include "msgpack_rpc/channel.c.generated.h" | ||||||
| #endif | #endif | ||||||
|  |  | ||||||
| /// Initializes the module | void rpc_init(void) | ||||||
| void channel_init(void) |  | ||||||
| { | { | ||||||
|   ch_before_blocking_events = multiqueue_new_child(main_loop.events); |   ch_before_blocking_events = multiqueue_new_child(main_loop.events); | ||||||
|   channels = pmap_new(uint64_t)(); |  | ||||||
|   event_strings = pmap_new(cstr_t)(); |   event_strings = pmap_new(cstr_t)(); | ||||||
|   msgpack_sbuffer_init(&out_buffer); |   msgpack_sbuffer_init(&out_buffer); | ||||||
|   remote_ui_init(); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Teardown the module |  | ||||||
| void channel_teardown(void) | void rpc_start(Channel *channel) | ||||||
| { | { | ||||||
|   if (!channels) { |   channel->is_rpc = true; | ||||||
|     return; |   RpcState *rpc = &channel->rpc; | ||||||
|   } |   rpc->closed = false; | ||||||
|  |   rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); | ||||||
|  |   rpc->subscribed_events = pmap_new(cstr_t)(); | ||||||
|  |   rpc->next_request_id = 1; | ||||||
|  |   kv_init(rpc->call_stack); | ||||||
|  |  | ||||||
|   Channel *channel; |   Stream *in = channel_instream(channel); | ||||||
|  |   Stream *out = channel_outstream(channel); | ||||||
|  |  | ||||||
|   map_foreach_value(channels, channel, { |   DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); | ||||||
|     close_channel(channel); |  | ||||||
|   }); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Creates an API channel by starting a process and connecting to its |   wstream_init(in, 0); | ||||||
| /// stdin/stdout. stderr is handled by the job infrastructure. |   rstream_init(out, CHANNEL_BUFFER_SIZE); | ||||||
| /// |   rstream_start(out, receive_msgpack, channel); | ||||||
| /// @param argv The argument vector for the process. [consumed] |  | ||||||
| /// @return The channel id (> 0), on success. |  | ||||||
| ///         0, on error. |  | ||||||
| uint64_t channel_from_process(Process *proc, uint64_t id) |  | ||||||
| { |  | ||||||
|   Channel *channel = register_channel(kChannelTypeProc, id, proc->events); |  | ||||||
|   incref(channel);  // process channels are only closed by the exit_cb |  | ||||||
|   channel->data.proc = proc; |  | ||||||
|  |  | ||||||
|   wstream_init(proc->in, 0); |  | ||||||
|   rstream_init(proc->out, 0); |  | ||||||
|   rstream_start(proc->out, receive_msgpack, channel); |  | ||||||
|  |  | ||||||
|   DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, |  | ||||||
|        proc->out); |  | ||||||
|  |  | ||||||
|   return channel->id; |  | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Creates an API channel from a tcp/pipe socket connection | /// Creates an API channel from a tcp/pipe socket connection | ||||||
| @@ -139,19 +81,15 @@ uint64_t channel_from_process(Process *proc, uint64_t id) | |||||||
| /// @param watcher The SocketWatcher ready to accept the connection | /// @param watcher The SocketWatcher ready to accept the connection | ||||||
| void channel_from_connection(SocketWatcher *watcher) | void channel_from_connection(SocketWatcher *watcher) | ||||||
| { | { | ||||||
|   Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); |   Channel *channel = channel_alloc(kChannelStreamSocket); | ||||||
|   socket_watcher_accept(watcher, &channel->data.stream); |   socket_watcher_accept(watcher, &channel->stream.socket); | ||||||
|   incref(channel);  // close channel only after the stream is closed |   channel_incref(channel);  // close channel only after the stream is closed | ||||||
|   channel->data.stream.internal_close_cb = close_cb; |   channel->stream.socket.internal_close_cb = close_cb; | ||||||
|   channel->data.stream.internal_data = channel; |   channel->stream.socket.internal_data = channel; | ||||||
|   wstream_init(&channel->data.stream, 0); |   rpc_start(channel); | ||||||
|   rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); |  | ||||||
|   rstream_start(&channel->data.stream, receive_msgpack, channel); |  | ||||||
|  |  | ||||||
|   DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, |  | ||||||
|        &channel->data.stream); |  | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /// TODO: move to eval.c, also support bytes | ||||||
| uint64_t channel_connect(bool tcp, const char *address, | uint64_t channel_connect(bool tcp, const char *address, | ||||||
|                          int timeout, const char **error) |                          int timeout, const char **error) | ||||||
| { | { | ||||||
| @@ -165,34 +103,40 @@ uint64_t channel_connect(bool tcp, const char *address, | |||||||
|     xfree(path); |     xfree(path); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); |   Channel *channel = channel_alloc(kChannelStreamSocket); | ||||||
|   if (!socket_connect(&main_loop, &channel->data.stream, |   if (!socket_connect(&main_loop, &channel->stream.socket, | ||||||
|                       tcp, address, timeout, error)) { |                       tcp, address, timeout, error)) { | ||||||
|     decref(channel); |     channel_decref(channel); | ||||||
|     return 0; |     return 0; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   incref(channel);  // close channel only after the stream is closed |   channel_incref(channel);  // close channel only after the stream is closed | ||||||
|   channel->data.stream.internal_close_cb = close_cb; |   channel->stream.socket.internal_close_cb = close_cb; | ||||||
|   channel->data.stream.internal_data = channel; |   channel->stream.socket.internal_data = channel; | ||||||
|   wstream_init(&channel->data.stream, 0); |   rpc_start(channel); | ||||||
|   rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); |  | ||||||
|   rstream_start(&channel->data.stream, receive_msgpack, channel); |  | ||||||
|   return channel->id; |   return channel->id; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | static Channel *find_rpc_channel(uint64_t id) | ||||||
|  | { | ||||||
|  |   Channel *chan = find_channel(id); | ||||||
|  |   if (!chan || !chan->is_rpc || chan->rpc.closed) { | ||||||
|  |     return NULL; | ||||||
|  |   } | ||||||
|  |   return chan; | ||||||
|  | } | ||||||
|  |  | ||||||
| /// Publishes an event to a channel. | /// Publishes an event to a channel. | ||||||
| /// | /// | ||||||
| /// @param id Channel id. 0 means "broadcast to all subscribed channels" | /// @param id Channel id. 0 means "broadcast to all subscribed channels" | ||||||
| /// @param name Event name (application-defined) | /// @param name Event name (application-defined) | ||||||
| /// @param args Array of event arguments | /// @param args Array of event arguments | ||||||
| /// @return True if the event was sent successfully, false otherwise. | /// @return True if the event was sent successfully, false otherwise. | ||||||
| bool channel_send_event(uint64_t id, const char *name, Array args) | bool rpc_send_event(uint64_t id, const char *name, Array args) | ||||||
| { | { | ||||||
|   Channel *channel = NULL; |   Channel *channel = NULL; | ||||||
|  |  | ||||||
|   if (id && (!(channel = pmap_get(uint64_t)(channels, id)) |   if (id && (!(channel = find_rpc_channel(id)))) { | ||||||
|             || channel->closed)) { |  | ||||||
|     api_free_array(args); |     api_free_array(args); | ||||||
|     return false; |     return false; | ||||||
|   } |   } | ||||||
| @@ -213,29 +157,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args) | |||||||
| /// @param args Array with method arguments | /// @param args Array with method arguments | ||||||
| /// @param[out] error True if the return value is an error | /// @param[out] error True if the return value is an error | ||||||
| /// @return Whatever the remote method returned | /// @return Whatever the remote method returned | ||||||
| Object channel_send_call(uint64_t id, | Object rpc_send_call(uint64_t id, | ||||||
|                          const char *method_name, |                      const char *method_name, | ||||||
|                          Array args, |                      Array args, | ||||||
|                          Error *err) |                      Error *err) | ||||||
| { | { | ||||||
|   Channel *channel = NULL; |   Channel *channel = NULL; | ||||||
|  |  | ||||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { |   if (!(channel = find_rpc_channel(id))) { | ||||||
|     api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); |     api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); | ||||||
|     api_free_array(args); |     api_free_array(args); | ||||||
|     return NIL; |     return NIL; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   incref(channel); |   channel_incref(channel); | ||||||
|   uint64_t request_id = channel->next_request_id++; |   RpcState *rpc = &channel->rpc; | ||||||
|  |   uint64_t request_id = rpc->next_request_id++; | ||||||
|   // Send the msgpack-rpc request |   // Send the msgpack-rpc request | ||||||
|   send_request(channel, request_id, method_name, args); |   send_request(channel, request_id, method_name, args); | ||||||
|  |  | ||||||
|   // Push the frame |   // Push the frame | ||||||
|   ChannelCallFrame frame = { request_id, false, false, NIL }; |   ChannelCallFrame frame = { request_id, false, false, NIL }; | ||||||
|   kv_push(channel->call_stack, &frame); |   kv_push(rpc->call_stack, &frame); | ||||||
|   LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); |   LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); | ||||||
|   (void)kv_pop(channel->call_stack); |   (void)kv_pop(rpc->call_stack); | ||||||
|  |  | ||||||
|   if (frame.errored) { |   if (frame.errored) { | ||||||
|     if (frame.result.type == kObjectTypeString) { |     if (frame.result.type == kObjectTypeString) { | ||||||
| @@ -260,7 +205,7 @@ Object channel_send_call(uint64_t id, | |||||||
|     api_free_object(frame.result); |     api_free_object(frame.result); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   decref(channel); |   channel_decref(channel); | ||||||
|  |  | ||||||
|   return frame.errored ? NIL : frame.result; |   return frame.errored ? NIL : frame.result; | ||||||
| } | } | ||||||
| @@ -269,11 +214,11 @@ Object channel_send_call(uint64_t id, | |||||||
| /// | /// | ||||||
| /// @param id The channel id | /// @param id The channel id | ||||||
| /// @param event The event type string | /// @param event The event type string | ||||||
| void channel_subscribe(uint64_t id, char *event) | void rpc_subscribe(uint64_t id, char *event) | ||||||
| { | { | ||||||
|   Channel *channel; |   Channel *channel; | ||||||
|  |  | ||||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { |   if (!(channel = find_rpc_channel(id))) { | ||||||
|     abort(); |     abort(); | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -284,18 +229,18 @@ void channel_subscribe(uint64_t id, char *event) | |||||||
|     pmap_put(cstr_t)(event_strings, event_string, event_string); |     pmap_put(cstr_t)(event_strings, event_string, event_string); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); |   pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Unsubscribes to event broadcasts | /// Unsubscribes to event broadcasts | ||||||
| /// | /// | ||||||
| /// @param id The channel id | /// @param id The channel id | ||||||
| /// @param event The event type string | /// @param event The event type string | ||||||
| void channel_unsubscribe(uint64_t id, char *event) | void rpc_unsubscribe(uint64_t id, char *event) | ||||||
| { | { | ||||||
|   Channel *channel; |   Channel *channel; | ||||||
|  |  | ||||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { |   if (!(channel = find_rpc_channel(id))) { | ||||||
|     abort(); |     abort(); | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -310,7 +255,7 @@ bool channel_close(uint64_t id) | |||||||
| { | { | ||||||
|   Channel *channel; |   Channel *channel; | ||||||
|  |  | ||||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { |   if (!(channel = find_rpc_channel(id))) { | ||||||
|     return false; |     return false; | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -322,24 +267,22 @@ bool channel_close(uint64_t id) | |||||||
| /// Neovim | /// Neovim | ||||||
| void channel_from_stdio(void) | void channel_from_stdio(void) | ||||||
| { | { | ||||||
|   Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); |   Channel *channel = channel_alloc(kChannelStreamStdio); | ||||||
|   incref(channel);  // stdio channels are only closed on exit |   channel_incref(channel);  // stdio channels are only closed on exit | ||||||
|   // read stream |   // read stream | ||||||
|   rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); |   rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); | ||||||
|   rstream_start(&channel->data.std.in, receive_msgpack, channel); |   wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); | ||||||
|   // write stream |  | ||||||
|   wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); |  | ||||||
|  |  | ||||||
|   DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, |   rpc_start(channel); | ||||||
|        &channel->data.std.in, &channel->data.std.out); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Creates a loopback channel. This is used to avoid deadlock | /// Creates a loopback channel. This is used to avoid deadlock | ||||||
| /// when an instance connects to its own named pipe. | /// when an instance connects to its own named pipe. | ||||||
| uint64_t channel_create_internal(void) | uint64_t channel_create_internal(void) | ||||||
| { | { | ||||||
|   Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); |   Channel *channel = channel_alloc(kChannelStreamInternal); | ||||||
|   incref(channel);  // internal channel lives until process exit |   channel_incref(channel);  // internal channel lives until process exit | ||||||
|  |   rpc_start(channel); | ||||||
|   return channel->id; |   return channel->id; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -347,8 +290,8 @@ void channel_process_exit(uint64_t id, int status) | |||||||
| { | { | ||||||
|   Channel *channel = pmap_get(uint64_t)(channels, id); |   Channel *channel = pmap_get(uint64_t)(channels, id); | ||||||
|  |  | ||||||
|   channel->closed = true; |   // channel_decref(channel); remove?? | ||||||
|   decref(channel); |   channel->rpc.closed = true; | ||||||
| } | } | ||||||
|  |  | ||||||
| // rstream.c:read_event() invokes this as stream->read_cb(). | // rstream.c:read_event() invokes this as stream->read_cb(). | ||||||
| @@ -356,7 +299,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, | |||||||
|                             void *data, bool eof) |                             void *data, bool eof) | ||||||
| { | { | ||||||
|   Channel *channel = data; |   Channel *channel = data; | ||||||
|   incref(channel); |   channel_incref(channel); | ||||||
|  |  | ||||||
|   if (eof) { |   if (eof) { | ||||||
|     close_channel(channel); |     close_channel(channel); | ||||||
| @@ -367,30 +310,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, | |||||||
|     goto end; |     goto end; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) |  | ||||||
|       || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { |  | ||||||
|     char buf[256]; |  | ||||||
|     snprintf(buf, sizeof(buf), |  | ||||||
|              "ch %" PRIu64 ": stream closed unexpectedly. " |  | ||||||
|              "closing channel", |  | ||||||
|              channel->id); |  | ||||||
|     call_set_error(channel, buf, WARN_LOG_LEVEL); |  | ||||||
|     goto end; |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   size_t count = rbuffer_size(rbuf); |   size_t count = rbuffer_size(rbuf); | ||||||
|   DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", |   DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", | ||||||
|        channel->id, count, stream); |        channel->id, count, stream); | ||||||
|  |  | ||||||
|   // Feed the unpacker with data |   // Feed the unpacker with data | ||||||
|   msgpack_unpacker_reserve_buffer(channel->unpacker, count); |   msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); | ||||||
|   rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); |   rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); | ||||||
|   msgpack_unpacker_buffer_consumed(channel->unpacker, count); |   msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); | ||||||
|  |  | ||||||
|   parse_msgpack(channel); |   parse_msgpack(channel); | ||||||
|  |  | ||||||
| end: | end: | ||||||
|   decref(channel); |   channel_decref(channel); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void parse_msgpack(Channel *channel) | static void parse_msgpack(Channel *channel) | ||||||
| @@ -400,8 +332,8 @@ static void parse_msgpack(Channel *channel) | |||||||
|   msgpack_unpack_return result; |   msgpack_unpack_return result; | ||||||
|  |  | ||||||
|   // Deserialize everything we can. |   // Deserialize everything we can. | ||||||
|   while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == |   while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == | ||||||
|       MSGPACK_UNPACK_SUCCESS) { |          MSGPACK_UNPACK_SUCCESS) { | ||||||
|     bool is_response = is_rpc_response(&unpacked.data); |     bool is_response = is_rpc_response(&unpacked.data); | ||||||
|     log_client_msg(channel->id, !is_response, unpacked.data); |     log_client_msg(channel->id, !is_response, unpacked.data); | ||||||
|  |  | ||||||
| @@ -427,7 +359,7 @@ static void parse_msgpack(Channel *channel) | |||||||
|   if (result == MSGPACK_UNPACK_NOMEM_ERROR) { |   if (result == MSGPACK_UNPACK_NOMEM_ERROR) { | ||||||
|     mch_errmsg(e_outofmem); |     mch_errmsg(e_outofmem); | ||||||
|     mch_errmsg("\n"); |     mch_errmsg("\n"); | ||||||
|     decref(channel); |     channel_decref(channel); | ||||||
|     preserve_exit(); |     preserve_exit(); | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @@ -492,7 +424,7 @@ static void handle_request(Channel *channel, msgpack_object *request) | |||||||
|   evdata->handler = handler; |   evdata->handler = handler; | ||||||
|   evdata->args = args; |   evdata->args = args; | ||||||
|   evdata->request_id = request_id; |   evdata->request_id = request_id; | ||||||
|   incref(channel); |   channel_incref(channel); | ||||||
|   if (handler.async) { |   if (handler.async) { | ||||||
|     bool is_get_mode = handler.fn == handle_nvim_get_mode; |     bool is_get_mode = handler.fn == handle_nvim_get_mode; | ||||||
|  |  | ||||||
| @@ -530,66 +462,30 @@ static void on_request_event(void **argv) | |||||||
|     api_free_object(result); |     api_free_object(result); | ||||||
|   } |   } | ||||||
|   api_free_array(args); |   api_free_array(args); | ||||||
|   decref(channel); |   channel_decref(channel); | ||||||
|   xfree(e); |   xfree(e); | ||||||
|   api_clear_error(&error); |   api_clear_error(&error); | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Returns the Stream that a Channel writes to. |  | ||||||
| static Stream *chan_wstream(Channel *chan) |  | ||||||
| { |  | ||||||
|   switch (chan->type) { |  | ||||||
|     case kChannelTypeSocket: |  | ||||||
|       return &chan->data.stream; |  | ||||||
|     case kChannelTypeProc: |  | ||||||
|       return chan->data.proc->in; |  | ||||||
|     case kChannelTypeStdio: |  | ||||||
|       return &chan->data.std.out; |  | ||||||
|     case kChannelTypeInternal: |  | ||||||
|       return NULL; |  | ||||||
|   } |  | ||||||
|   abort(); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Returns the Stream that a Channel reads from. |  | ||||||
| static Stream *chan_rstream(Channel *chan) |  | ||||||
| { |  | ||||||
|   switch (chan->type) { |  | ||||||
|     case kChannelTypeSocket: |  | ||||||
|       return &chan->data.stream; |  | ||||||
|     case kChannelTypeProc: |  | ||||||
|       return chan->data.proc->out; |  | ||||||
|     case kChannelTypeStdio: |  | ||||||
|       return &chan->data.std.in; |  | ||||||
|     case kChannelTypeInternal: |  | ||||||
|       return NULL; |  | ||||||
|   } |  | ||||||
|   abort(); |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| static bool channel_write(Channel *channel, WBuffer *buffer) | static bool channel_write(Channel *channel, WBuffer *buffer) | ||||||
| { | { | ||||||
|   bool success = false; |   bool success; | ||||||
|  |  | ||||||
|   if (channel->closed) { |   if (channel->rpc.closed) { | ||||||
|     wstream_release_wbuffer(buffer); |     wstream_release_wbuffer(buffer); | ||||||
|     return false; |     return false; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   switch (channel->type) { |   if (channel->streamtype == kChannelStreamInternal) { | ||||||
|     case kChannelTypeSocket: |     channel_incref(channel); | ||||||
|     case kChannelTypeProc: |     CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); | ||||||
|     case kChannelTypeStdio: |     success = true; | ||||||
|       success = wstream_write(chan_wstream(channel), buffer); |   } else { | ||||||
|       break; |     Stream *in = channel_instream(channel); | ||||||
|     case kChannelTypeInternal: |     success = wstream_write(in, buffer); | ||||||
|       incref(channel); |  | ||||||
|       CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); |  | ||||||
|       success = true; |  | ||||||
|       break; |  | ||||||
|   } |   } | ||||||
|  |  | ||||||
|  |  | ||||||
|   if (!success) { |   if (!success) { | ||||||
|     // If the write failed for any reason, close the channel |     // If the write failed for any reason, close the channel | ||||||
|     char buf[256]; |     char buf[256]; | ||||||
| @@ -609,14 +505,14 @@ static void internal_read_event(void **argv) | |||||||
|   Channel *channel = argv[0]; |   Channel *channel = argv[0]; | ||||||
|   WBuffer *buffer = argv[1]; |   WBuffer *buffer = argv[1]; | ||||||
|  |  | ||||||
|   msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); |   msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); | ||||||
|   memcpy(msgpack_unpacker_buffer(channel->unpacker), |   memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), | ||||||
|          buffer->data, buffer->size); |          buffer->data, buffer->size); | ||||||
|   msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); |   msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); | ||||||
|  |  | ||||||
|   parse_msgpack(channel); |   parse_msgpack(channel); | ||||||
|  |  | ||||||
|   decref(channel); |   channel_decref(channel); | ||||||
|   wstream_release_wbuffer(buffer); |   wstream_release_wbuffer(buffer); | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -665,7 +561,8 @@ static void broadcast_event(const char *name, Array args) | |||||||
|   Channel *channel; |   Channel *channel; | ||||||
|  |  | ||||||
|   map_foreach_value(channels, channel, { |   map_foreach_value(channels, channel, { | ||||||
|     if (pmap_has(cstr_t)(channel->subscribed_events, name)) { |     if (channel->is_rpc | ||||||
|  |         && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { | ||||||
|       kv_push(subscribed, channel); |       kv_push(subscribed, channel); | ||||||
|     } |     } | ||||||
|   }); |   }); | ||||||
| @@ -695,10 +592,11 @@ end: | |||||||
| static void unsubscribe(Channel *channel, char *event) | static void unsubscribe(Channel *channel, char *event) | ||||||
| { | { | ||||||
|   char *event_string = pmap_get(cstr_t)(event_strings, event); |   char *event_string = pmap_get(cstr_t)(event_strings, event); | ||||||
|   pmap_del(cstr_t)(channel->subscribed_events, event_string); |   pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); | ||||||
|  |  | ||||||
|   map_foreach_value(channels, channel, { |   map_foreach_value(channels, channel, { | ||||||
|     if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { |     if (channel->is_rpc | ||||||
|  |         && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { | ||||||
|       return; |       return; | ||||||
|     } |     } | ||||||
|   }); |   }); | ||||||
| @@ -709,86 +607,65 @@ static void unsubscribe(Channel *channel, char *event) | |||||||
| } | } | ||||||
|  |  | ||||||
| /// Close the channel streams/process and free the channel resources. | /// Close the channel streams/process and free the channel resources. | ||||||
|  | /// TODO: move to channel.h | ||||||
| static void close_channel(Channel *channel) | static void close_channel(Channel *channel) | ||||||
| { | { | ||||||
|   if (channel->closed) { |   if (channel->rpc.closed) { | ||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   channel->closed = true; |   channel->rpc.closed = true; | ||||||
|  |  | ||||||
|   switch (channel->type) { |   switch (channel->streamtype) { | ||||||
|     case kChannelTypeSocket: |     case kChannelStreamSocket: | ||||||
|       stream_close(&channel->data.stream, NULL, NULL); |       stream_close(&channel->stream.socket, NULL, NULL); | ||||||
|       break; |       break; | ||||||
|     case kChannelTypeProc: |     case kChannelStreamProc: | ||||||
|       // Only close the rpc channel part, |       // Only close the rpc channel part, | ||||||
|       // there could be an error message on the stderr stream |       // there could be an error message on the stderr stream | ||||||
|       process_close_in(channel->data.proc); |       process_close_in(&channel->stream.proc); | ||||||
|       process_close_out(channel->data.proc); |       process_close_out(&channel->stream.proc); | ||||||
|       break; |       break; | ||||||
|     case kChannelTypeStdio: |     case kChannelStreamStdio: | ||||||
|       stream_close(&channel->data.std.in, NULL, NULL); |       stream_close(&channel->stream.stdio.in, NULL, NULL); | ||||||
|       stream_close(&channel->data.std.out, NULL, NULL); |       stream_close(&channel->stream.stdio.out, NULL, NULL); | ||||||
|       multiqueue_put(main_loop.fast_events, exit_event, 1, channel); |       multiqueue_put(main_loop.fast_events, exit_event, 1, channel); | ||||||
|       return; |       return; | ||||||
|     case kChannelTypeInternal: |     case kChannelStreamInternal: | ||||||
|       // nothing to free. |       // nothing to free. | ||||||
|       break; |       break; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   decref(channel); |   channel_decref(channel); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void exit_event(void **argv) | static void exit_event(void **argv) | ||||||
| { | { | ||||||
|   decref(argv[0]); |   channel_decref(argv[0]); | ||||||
|  |  | ||||||
|   if (!exiting) { |   if (!exiting) { | ||||||
|     mch_exit(0); |     mch_exit(0); | ||||||
|   } |   } | ||||||
| } | } | ||||||
|  |  | ||||||
| static void free_channel(Channel *channel) | void rpc_free(Channel *channel) | ||||||
| { | { | ||||||
|   remote_ui_disconnect(channel->id); |   remote_ui_disconnect(channel->id); | ||||||
|   pmap_del(uint64_t)(channels, channel->id); |   msgpack_unpacker_free(channel->rpc.unpacker); | ||||||
|   msgpack_unpacker_free(channel->unpacker); |  | ||||||
|  |  | ||||||
|   // Unsubscribe from all events |   // Unsubscribe from all events | ||||||
|   char *event_string; |   char *event_string; | ||||||
|   map_foreach_value(channel->subscribed_events, event_string, { |   map_foreach_value(channel->rpc.subscribed_events, event_string, { | ||||||
|     unsubscribe(channel, event_string); |     unsubscribe(channel, event_string); | ||||||
|   }); |   }); | ||||||
|  |  | ||||||
|   pmap_free(cstr_t)(channel->subscribed_events); |   pmap_free(cstr_t)(channel->rpc.subscribed_events); | ||||||
|   kv_destroy(channel->call_stack); |   kv_destroy(channel->rpc.call_stack); | ||||||
|   if (channel->type != kChannelTypeProc) { |  | ||||||
|     multiqueue_free(channel->events); |  | ||||||
|   } |  | ||||||
|   xfree(channel); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| static void close_cb(Stream *stream, void *data) | static void close_cb(Stream *stream, void *data) | ||||||
| { | { | ||||||
|   decref(data); |   channel_decref(data); | ||||||
| } |  | ||||||
|  |  | ||||||
| static Channel *register_channel(ChannelType type, uint64_t id, |  | ||||||
|                                  MultiQueue *events) |  | ||||||
| { |  | ||||||
|   Channel *rv = xmalloc(sizeof(Channel)); |  | ||||||
|   rv->events = events ? events : multiqueue_new_child(main_loop.events); |  | ||||||
|   rv->type = type; |  | ||||||
|   rv->refcount = 1; |  | ||||||
|   rv->closed = false; |  | ||||||
|   rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); |  | ||||||
|   rv->id = id > 0 ? id : next_chan_id++; |  | ||||||
|   rv->subscribed_events = pmap_new(cstr_t)(); |  | ||||||
|   rv->next_request_id = 1; |  | ||||||
|   kv_init(rv->call_stack); |  | ||||||
|   pmap_put(uint64_t)(channels, rv->id, rv); |  | ||||||
|   return rv; |  | ||||||
| } | } | ||||||
|  |  | ||||||
| static bool is_rpc_response(msgpack_object *obj) | static bool is_rpc_response(msgpack_object *obj) | ||||||
| @@ -803,15 +680,18 @@ static bool is_rpc_response(msgpack_object *obj) | |||||||
| static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) | static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) | ||||||
| { | { | ||||||
|   uint64_t response_id = obj->via.array.ptr[1].via.u64; |   uint64_t response_id = obj->via.array.ptr[1].via.u64; | ||||||
|  |   if (kv_size(channel->rpc.call_stack) == 0) { | ||||||
|  |     return false; | ||||||
|  |   } | ||||||
|  |  | ||||||
|   // Must be equal to the frame at the stack's bottom |   // Must be equal to the frame at the stack's bottom | ||||||
|   return kv_size(channel->call_stack) && response_id |   ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); | ||||||
|     == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; |   return response_id == frame->request_id; | ||||||
| } | } | ||||||
|  |  | ||||||
| static void complete_call(msgpack_object *obj, Channel *channel) | static void complete_call(msgpack_object *obj, Channel *channel) | ||||||
| { | { | ||||||
|   ChannelCallFrame *frame = kv_A(channel->call_stack, |   ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); | ||||||
|                              kv_size(channel->call_stack) - 1); |  | ||||||
|   frame->returned = true; |   frame->returned = true; | ||||||
|   frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; |   frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; | ||||||
|  |  | ||||||
| @@ -825,8 +705,8 @@ static void complete_call(msgpack_object *obj, Channel *channel) | |||||||
| static void call_set_error(Channel *channel, char *msg, int loglevel) | static void call_set_error(Channel *channel, char *msg, int loglevel) | ||||||
| { | { | ||||||
|   LOG(loglevel, "RPC: %s", msg); |   LOG(loglevel, "RPC: %s", msg); | ||||||
|   for (size_t i = 0; i < kv_size(channel->call_stack); i++) { |   for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { | ||||||
|     ChannelCallFrame *frame = kv_A(channel->call_stack, i); |     ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); | ||||||
|     frame->returned = true; |     frame->returned = true; | ||||||
|     frame->errored = true; |     frame->errored = true; | ||||||
|     api_free_object(frame->result); |     api_free_object(frame->result); | ||||||
| @@ -875,18 +755,6 @@ static WBuffer *serialize_response(uint64_t channel_id, | |||||||
|   return rv; |   return rv; | ||||||
| } | } | ||||||
|  |  | ||||||
| static void incref(Channel *channel) |  | ||||||
| { |  | ||||||
|   channel->refcount++; |  | ||||||
| } |  | ||||||
|  |  | ||||||
| static void decref(Channel *channel) |  | ||||||
| { |  | ||||||
|   if (!(--channel->refcount)) { |  | ||||||
|     free_channel(channel); |  | ||||||
|   } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL | #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL | ||||||
| #define REQ "[request]  " | #define REQ "[request]  " | ||||||
| #define RES "[response] " | #define RES "[response] " | ||||||
|   | |||||||
| @@ -8,6 +8,7 @@ | |||||||
| #include "nvim/event/socket.h" | #include "nvim/event/socket.h" | ||||||
| #include "nvim/event/process.h" | #include "nvim/event/process.h" | ||||||
| #include "nvim/vim.h" | #include "nvim/vim.h" | ||||||
|  | #include "nvim/channel.h" | ||||||
|  |  | ||||||
| #define METHOD_MAXLEN 512 | #define METHOD_MAXLEN 512 | ||||||
|  |  | ||||||
| @@ -16,6 +17,7 @@ | |||||||
| ///       of os_inchar(), so they are processed "just-in-time". | ///       of os_inchar(), so they are processed "just-in-time". | ||||||
| MultiQueue *ch_before_blocking_events; | MultiQueue *ch_before_blocking_events; | ||||||
|  |  | ||||||
|  |  | ||||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||||
| # include "msgpack_rpc/channel.h.generated.h" | # include "msgpack_rpc/channel.h.generated.h" | ||||||
| #endif | #endif | ||||||
|   | |||||||
							
								
								
									
										36
									
								
								src/nvim/msgpack_rpc/channel_defs.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								src/nvim/msgpack_rpc/channel_defs.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | |||||||
|  | #ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H | ||||||
|  | #define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H | ||||||
|  |  | ||||||
|  | #include <stdbool.h> | ||||||
|  | #include <uv.h> | ||||||
|  | #include <msgpack.h> | ||||||
|  |  | ||||||
|  | #include "nvim/api/private/defs.h" | ||||||
|  | #include "nvim/event/socket.h" | ||||||
|  | #include "nvim/event/process.h" | ||||||
|  | #include "nvim/vim.h" | ||||||
|  |  | ||||||
|  | typedef struct Channel Channel; | ||||||
|  |  | ||||||
|  | typedef struct { | ||||||
|  |   uint64_t request_id; | ||||||
|  |   bool returned, errored; | ||||||
|  |   Object result; | ||||||
|  | } ChannelCallFrame; | ||||||
|  |  | ||||||
|  | typedef struct { | ||||||
|  |   Channel *channel; | ||||||
|  |   MsgpackRpcRequestHandler handler; | ||||||
|  |   Array args; | ||||||
|  |   uint64_t request_id; | ||||||
|  | } RequestEvent; | ||||||
|  |  | ||||||
|  | typedef struct { | ||||||
|  |   PMap(cstr_t) *subscribed_events; | ||||||
|  |   bool closed; | ||||||
|  |   msgpack_unpacker *unpacker; | ||||||
|  |   uint64_t next_request_id; | ||||||
|  |   kvec_t(ChannelCallFrame *) call_stack; | ||||||
|  | } RpcState; | ||||||
|  |  | ||||||
|  | #endif  // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H | ||||||
| @@ -47,7 +47,7 @@ int pty_process_spawn(PtyProcess *ptyproc) | |||||||
|  |  | ||||||
|   int status = 0;  // zero or negative error code (libuv convention) |   int status = 0;  // zero or negative error code (libuv convention) | ||||||
|   Process *proc = (Process *)ptyproc; |   Process *proc = (Process *)ptyproc; | ||||||
|   assert(!proc->err); |   assert(proc->err.closed); | ||||||
|   uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); |   uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); | ||||||
|   ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 }; |   ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 }; | ||||||
|   uv_disable_stdio_inheritance(); |   uv_disable_stdio_inheritance(); | ||||||
| @@ -83,12 +83,12 @@ int pty_process_spawn(PtyProcess *ptyproc) | |||||||
|     goto error; |     goto error; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->in |   if (!proc->in.closed | ||||||
|       && (status = set_duplicating_descriptor(master, &proc->in->uv.pipe))) { |       && (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) { | ||||||
|     goto error; |     goto error; | ||||||
|   } |   } | ||||||
|   if (proc->out |   if (!proc->out.closed | ||||||
|       && (status = set_duplicating_descriptor(master, &proc->out->uv.pipe))) { |       && (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) { | ||||||
|     goto error; |     goto error; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -44,7 +44,7 @@ int pty_process_spawn(PtyProcess *ptyproc) | |||||||
|   wchar_t *cwd = NULL; |   wchar_t *cwd = NULL; | ||||||
|   const char *emsg = NULL; |   const char *emsg = NULL; | ||||||
|  |  | ||||||
|   assert(!proc->err); |   assert(proc->err.closed); | ||||||
|  |  | ||||||
|   cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err); |   cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err); | ||||||
|   if (cfg == NULL) { |   if (cfg == NULL) { | ||||||
| @@ -71,20 +71,20 @@ int pty_process_spawn(PtyProcess *ptyproc) | |||||||
|     goto cleanup; |     goto cleanup; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->in != NULL) { |   if (!proc->in.closed) { | ||||||
|     in_req = xmalloc(sizeof(uv_connect_t)); |     in_req = xmalloc(sizeof(uv_connect_t)); | ||||||
|     uv_pipe_connect( |     uv_pipe_connect( | ||||||
|         in_req, |         in_req, | ||||||
|         &proc->in->uv.pipe, |         &proc->in.uv.pipe, | ||||||
|         in_name, |         in_name, | ||||||
|         pty_process_connect_cb); |         pty_process_connect_cb); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   if (proc->out != NULL) { |   if (!proc->out.closed) { | ||||||
|     out_req = xmalloc(sizeof(uv_connect_t)); |     out_req = xmalloc(sizeof(uv_connect_t)); | ||||||
|     uv_pipe_connect( |     uv_pipe_connect( | ||||||
|         out_req, |         out_req, | ||||||
|         &proc->out->uv.pipe, |         &proc->out.uv.pipe, | ||||||
|         out_name, |         out_name, | ||||||
|         pty_process_connect_cb); |         pty_process_connect_cb); | ||||||
|   } |   } | ||||||
| @@ -228,7 +228,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer) | |||||||
|   PtyProcess *ptyproc = wait_eof_timer->data; |   PtyProcess *ptyproc = wait_eof_timer->data; | ||||||
|   Process *proc = (Process *)ptyproc; |   Process *proc = (Process *)ptyproc; | ||||||
|  |  | ||||||
|   if (!proc->out || !uv_is_readable(proc->out->uvstream)) { |   if (proc->out.closed || !uv_is_readable(proc->out.uvstream)) { | ||||||
|     uv_timer_stop(&ptyproc->wait_eof_timer); |     uv_timer_stop(&ptyproc->wait_eof_timer); | ||||||
|     pty_process_finish2(ptyproc); |     pty_process_finish2(ptyproc); | ||||||
|   } |   } | ||||||
|   | |||||||
| @@ -207,16 +207,12 @@ static int do_os_system(char **argv, | |||||||
|   char prog[MAXPATHL]; |   char prog[MAXPATHL]; | ||||||
|   xstrlcpy(prog, argv[0], MAXPATHL); |   xstrlcpy(prog, argv[0], MAXPATHL); | ||||||
|  |  | ||||||
|   Stream in, out, err; |  | ||||||
|   LibuvProcess uvproc = libuv_process_init(&main_loop, &buf); |   LibuvProcess uvproc = libuv_process_init(&main_loop, &buf); | ||||||
|   Process *proc = &uvproc.process; |   Process *proc = &uvproc.process; | ||||||
|   MultiQueue *events = multiqueue_new_child(main_loop.events); |   MultiQueue *events = multiqueue_new_child(main_loop.events); | ||||||
|   proc->events = events; |   proc->events = events; | ||||||
|   proc->argv = argv; |   proc->argv = argv; | ||||||
|   proc->in = input != NULL ? &in : NULL; |   int status = process_spawn(proc, input != NULL, true, true); | ||||||
|   proc->out = &out; |  | ||||||
|   proc->err = &err; |  | ||||||
|   int status = process_spawn(proc); |  | ||||||
|   if (status) { |   if (status) { | ||||||
|     loop_poll_events(&main_loop, 0); |     loop_poll_events(&main_loop, 0); | ||||||
|     // Failed, probably 'shell' is not executable. |     // Failed, probably 'shell' is not executable. | ||||||
| @@ -236,27 +232,27 @@ static int do_os_system(char **argv, | |||||||
|   // streams while there's still data in the OS buffer (due to the process |   // streams while there's still data in the OS buffer (due to the process | ||||||
|   // exiting before all data is read). |   // exiting before all data is read). | ||||||
|   if (input != NULL) { |   if (input != NULL) { | ||||||
|     proc->in->events = NULL; |     proc->in.events = NULL; | ||||||
|     wstream_init(proc->in, 0); |     wstream_init(&proc->in, 0); | ||||||
|   } |   } | ||||||
|   proc->out->events = NULL; |   proc->out.events = NULL; | ||||||
|   rstream_init(proc->out, 0); |   rstream_init(&proc->out, 0); | ||||||
|   rstream_start(proc->out, data_cb, &buf); |   rstream_start(&proc->out, data_cb, &buf); | ||||||
|   proc->err->events = NULL; |   proc->err.events = NULL; | ||||||
|   rstream_init(proc->err, 0); |   rstream_init(&proc->err, 0); | ||||||
|   rstream_start(proc->err, data_cb, &buf); |   rstream_start(&proc->err, data_cb, &buf); | ||||||
|  |  | ||||||
|   // write the input, if any |   // write the input, if any | ||||||
|   if (input) { |   if (input) { | ||||||
|     WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL); |     WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL); | ||||||
|  |  | ||||||
|     if (!wstream_write(&in, input_buffer)) { |     if (!wstream_write(&proc->in, input_buffer)) { | ||||||
|       // couldn't write, stop the process and tell the user about it |       // couldn't write, stop the process and tell the user about it | ||||||
|       process_stop(proc); |       process_stop(proc); | ||||||
|       return -1; |       return -1; | ||||||
|     } |     } | ||||||
|     // close the input stream after everything is written |     // close the input stream after everything is written | ||||||
|     wstream_set_write_cb(&in, shell_write_cb, NULL); |     wstream_set_write_cb(&proc->in, shell_write_cb, NULL); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   // Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the |   // Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Björn Linse
					Björn Linse