mirror of
				https://github.com/neovim/neovim.git
				synced 2025-10-26 12:27:24 +00:00 
			
		
		
		
	channels: refactor
This commit is contained in:
		| @@ -252,7 +252,7 @@ static void remote_ui_flush(UI *ui) | ||||
| { | ||||
|   UIData *data = ui->data; | ||||
|   if (data->buffer.size > 0) { | ||||
|     channel_send_event(data->channel_id, "redraw", data->buffer); | ||||
|     rpc_send_event(data->channel_id, "redraw", data->buffer); | ||||
|     data->buffer = (Array)ARRAY_DICT_INIT; | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -721,7 +721,7 @@ void nvim_subscribe(uint64_t channel_id, String event) | ||||
|   char e[METHOD_MAXLEN + 1]; | ||||
|   memcpy(e, event.data, length); | ||||
|   e[length] = NUL; | ||||
|   channel_subscribe(channel_id, e); | ||||
|   rpc_subscribe(channel_id, e); | ||||
| } | ||||
|  | ||||
| /// Unsubscribes to event broadcasts | ||||
| @@ -737,7 +737,7 @@ void nvim_unsubscribe(uint64_t channel_id, String event) | ||||
|   char e[METHOD_MAXLEN + 1]; | ||||
|   memcpy(e, event.data, length); | ||||
|   e[length] = NUL; | ||||
|   channel_unsubscribe(channel_id, e); | ||||
|   rpc_unsubscribe(channel_id, e); | ||||
| } | ||||
|  | ||||
| Integer nvim_get_color_by_name(String name) | ||||
|   | ||||
							
								
								
									
										61
									
								
								src/nvim/channel.c
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								src/nvim/channel.c
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,61 @@ | ||||
| // This is an open source non-commercial project. Dear PVS-Studio, please check | ||||
| // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com | ||||
|  | ||||
| #include "nvim/api/ui.h" | ||||
| #include "nvim/channel.h" | ||||
| #include "nvim/msgpack_rpc/channel.h" | ||||
|  | ||||
| PMap(uint64_t) *channels = NULL; | ||||
|  | ||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||
| # include "channel.c.generated.h" | ||||
| #endif | ||||
| /// Teardown the module | ||||
| void channel_teardown(void) | ||||
| { | ||||
|   if (!channels) { | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   Channel *channel; | ||||
|  | ||||
|   map_foreach_value(channels, channel, { | ||||
|     (void)channel;  // close_channel(channel); | ||||
|   }); | ||||
| } | ||||
|  | ||||
| /// Initializes the module | ||||
| void channel_init(void) | ||||
| { | ||||
|   channels = pmap_new(uint64_t)(); | ||||
|   rpc_init(); | ||||
|   remote_ui_init(); | ||||
| } | ||||
|  | ||||
| void channel_incref(Channel *channel) | ||||
| { | ||||
|   channel->refcount++; | ||||
| } | ||||
|  | ||||
| void channel_decref(Channel *channel) | ||||
| { | ||||
|   if (!(--channel->refcount)) { | ||||
|     multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel); | ||||
|   } | ||||
| } | ||||
|  | ||||
| static void free_channel_event(void **argv) | ||||
| { | ||||
|   Channel *channel = argv[0]; | ||||
|   if (channel->is_rpc) { | ||||
|     rpc_free(channel); | ||||
|   } | ||||
|  | ||||
|   callback_free(&channel->on_stdout); | ||||
|   callback_free(&channel->on_stderr); | ||||
|   callback_free(&channel->on_exit); | ||||
|  | ||||
|   pmap_del(uint64_t)(channels, channel->id); | ||||
|   multiqueue_free(channel->events); | ||||
|   xfree(channel); | ||||
| } | ||||
							
								
								
									
										120
									
								
								src/nvim/channel.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										120
									
								
								src/nvim/channel.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,120 @@ | ||||
| #ifndef NVIM_CHANNEL_H | ||||
| #define NVIM_CHANNEL_H | ||||
|  | ||||
| #include "nvim/main.h" | ||||
| #include "nvim/event/socket.h" | ||||
| #include "nvim/event/process.h" | ||||
| #include "nvim/os/pty_process.h" | ||||
| #include "nvim/event/libuv_process.h" | ||||
| #include "nvim/eval/typval.h" | ||||
| #include "nvim/msgpack_rpc/channel_defs.h" | ||||
|  | ||||
| typedef enum { | ||||
|   kChannelStreamProc, | ||||
|   kChannelStreamSocket, | ||||
|   kChannelStreamStdio, | ||||
|   kChannelStreamInternal | ||||
| } ChannelStreamType; | ||||
|  | ||||
| typedef struct { | ||||
|   Stream in; | ||||
|   Stream out; | ||||
| } StdioPair; | ||||
|  | ||||
| // typedef struct { | ||||
| //   Callback on_out; | ||||
| //   Callback on_close; | ||||
| //   Garray buffer; | ||||
| //   bool buffering; | ||||
| // } CallbackReader | ||||
|  | ||||
| #define CallbackReader Callback | ||||
|  | ||||
| struct Channel { | ||||
|   uint64_t id; | ||||
|   size_t refcount; | ||||
|   MultiQueue *events; | ||||
|  | ||||
|   ChannelStreamType streamtype; | ||||
|   union { | ||||
|     Process proc; | ||||
|     LibuvProcess uv; | ||||
|     PtyProcess pty; | ||||
|     Stream socket; | ||||
|     StdioPair stdio; | ||||
|   } stream; | ||||
|  | ||||
|   bool is_rpc; | ||||
|   RpcState rpc; | ||||
|   Terminal *term; | ||||
|  | ||||
|   CallbackReader on_stdout; | ||||
|   CallbackReader on_stderr; | ||||
|   Callback on_exit; | ||||
|  | ||||
|   varnumber_T *status_ptr; // TODO: refactor? | ||||
| }; | ||||
|  | ||||
| EXTERN PMap(uint64_t) *channels; | ||||
|  | ||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||
| # include "channel.h.generated.h" | ||||
| #endif | ||||
|  | ||||
| static inline Channel *channel_alloc(ChannelStreamType type) | ||||
| { | ||||
|   Channel *chan = xcalloc(1, sizeof(*chan)); | ||||
|   chan->id = next_chan_id++; | ||||
|   chan->events = multiqueue_new_child(main_loop.events); | ||||
|   chan->refcount = 1; | ||||
|   chan->streamtype = type; | ||||
|   pmap_put(uint64_t)(channels, chan->id, chan); | ||||
|   return chan; | ||||
| } | ||||
|  | ||||
| /// @returns Channel with the id or NULL if not found | ||||
| static inline Channel *find_channel(uint64_t id) | ||||
| { | ||||
|   return pmap_get(uint64_t)(channels, id); | ||||
| } | ||||
|  | ||||
| static inline Stream *channel_instream(Channel *chan) | ||||
|   FUNC_ATTR_NONNULL_ALL | ||||
| { | ||||
|   switch (chan->streamtype) { | ||||
|     case kChannelStreamProc: | ||||
|       return &chan->stream.proc.in; | ||||
|  | ||||
|     case kChannelStreamSocket: | ||||
|       return &chan->stream.socket; | ||||
|  | ||||
|     case kChannelStreamStdio: | ||||
|       return &chan->stream.stdio.in; | ||||
|  | ||||
|     case kChannelStreamInternal: | ||||
|       abort(); | ||||
|   } | ||||
|   abort(); | ||||
| } | ||||
|  | ||||
| static inline Stream *channel_outstream(Channel *chan) | ||||
|   FUNC_ATTR_NONNULL_ALL | ||||
| { | ||||
|   switch (chan->streamtype) { | ||||
|     case kChannelStreamProc: | ||||
|       return &chan->stream.proc.out; | ||||
|  | ||||
|     case kChannelStreamSocket: | ||||
|       return &chan->stream.socket; | ||||
|  | ||||
|     case kChannelStreamStdio: | ||||
|       return &chan->stream.stdio.out; | ||||
|  | ||||
|     case kChannelStreamInternal: | ||||
|       abort(); | ||||
|   } | ||||
|   abort(); | ||||
| } | ||||
|  | ||||
|  | ||||
| #endif  // NVIM_CHANNEL_H | ||||
							
								
								
									
										314
									
								
								src/nvim/eval.c
									
									
									
									
									
								
							
							
						
						
									
										314
									
								
								src/nvim/eval.c
									
									
									
									
									
								
							| @@ -24,6 +24,7 @@ | ||||
| #endif | ||||
| #include "nvim/eval.h" | ||||
| #include "nvim/buffer.h" | ||||
| #include "nvim/channel.h" | ||||
| #include "nvim/charset.h" | ||||
| #include "nvim/cursor.h" | ||||
| #include "nvim/diff.h" | ||||
| @@ -437,29 +438,12 @@ static ScopeDictDictItem vimvars_var; | ||||
| #define vimvarht  vimvardict.dv_hashtab | ||||
|  | ||||
| typedef struct { | ||||
|   union { | ||||
|     LibuvProcess uv; | ||||
|     PtyProcess pty; | ||||
|   } proc; | ||||
|   Stream in, out, err;  // Initialized in common_job_start(). | ||||
|   Terminal *term; | ||||
|   bool stopped; | ||||
|   bool exited; | ||||
|   bool rpc; | ||||
|   int refcount; | ||||
|   Callback on_stdout, on_stderr, on_exit; | ||||
|   varnumber_T *status_ptr; | ||||
|   uint64_t id; | ||||
|   MultiQueue *events; | ||||
| } TerminalJobData; | ||||
|  | ||||
| typedef struct { | ||||
|   TerminalJobData *data; | ||||
|   Channel *data; | ||||
|   Callback *callback; | ||||
|   const char *type; | ||||
|   list_T *received; | ||||
|   int status; | ||||
| } JobEvent; | ||||
| } ChannelEvent; | ||||
|  | ||||
| typedef struct { | ||||
|   TimeWatcher tw; | ||||
| @@ -513,7 +497,6 @@ typedef enum { | ||||
| #define FNE_INCL_BR     1       /* find_name_end(): include [] in name */ | ||||
| #define FNE_CHECK_START 2       /* find_name_end(): check name starts with | ||||
|                                    valid character */ | ||||
| static PMap(uint64_t) *jobs = NULL; | ||||
|  | ||||
| static uint64_t last_timer_id = 0; | ||||
| static PMap(uint64_t) *timers = NULL; | ||||
| @@ -556,7 +539,6 @@ void eval_init(void) | ||||
| { | ||||
|   vimvars[VV_VERSION].vv_nr = VIM_VERSION_100; | ||||
|  | ||||
|   jobs = pmap_new(uint64_t)(); | ||||
|   timers = pmap_new(uint64_t)(); | ||||
|   struct vimvar   *p; | ||||
|  | ||||
| @@ -5141,8 +5123,8 @@ bool garbage_collect(bool testing) | ||||
|  | ||||
|   // Jobs | ||||
|   { | ||||
|     TerminalJobData *data; | ||||
|     map_foreach_value(jobs, data, { | ||||
|     Channel *data; | ||||
|     map_foreach_value(channels, data, { | ||||
|       set_ref_in_callback(&data->on_stdout, copyID, NULL, NULL); | ||||
|       set_ref_in_callback(&data->on_stderr, copyID, NULL, NULL); | ||||
|       set_ref_in_callback(&data->on_exit, copyID, NULL, NULL); | ||||
| @@ -11433,24 +11415,23 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); | ||||
|   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||
|   if (!data) { | ||||
|     EMSG(_(e_invjob)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   Process *proc = (Process *)&data->proc; | ||||
|   Process *proc = (Process *)&data->stream.proc; | ||||
|  | ||||
|   if (argvars[1].v_type == VAR_STRING) { | ||||
|     char *stream = (char *)argvars[1].vval.v_string; | ||||
|     if (!strcmp(stream, "stdin")) { | ||||
|       if (data->rpc) { | ||||
|       if (data->is_rpc) { | ||||
|         EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); | ||||
|       } else { | ||||
|         process_close_in(proc); | ||||
|       } | ||||
|     } else if (!strcmp(stream, "stdout")) { | ||||
|       if (data->rpc) { | ||||
|       if (data->is_rpc) { | ||||
|         EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')")); | ||||
|       } else { | ||||
|         process_close_out(proc); | ||||
| @@ -11458,7 +11439,7 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|     } else if (!strcmp(stream, "stderr")) { | ||||
|       process_close_err(proc); | ||||
|     } else if (!strcmp(stream, "rpc")) { | ||||
|       if (data->rpc) { | ||||
|       if (data->is_rpc) { | ||||
|         channel_close(data->id); | ||||
|       } else { | ||||
|         EMSG(_("Invalid job stream: Not an rpc job")); | ||||
| @@ -11467,13 +11448,13 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|       EMSG2(_("Invalid job stream \"%s\""), stream); | ||||
|     } | ||||
|   } else { | ||||
|     if (data->rpc) { | ||||
|     if (data->is_rpc) { | ||||
|       channel_close(data->id); | ||||
|       process_close_err(proc); | ||||
|     } else { | ||||
|       process_close_streams(proc); | ||||
|       if (proc->type == kProcessTypePty) { | ||||
|         pty_process_close_master(&data->proc.pty); | ||||
|         pty_process_close_master(&data->stream.pty); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| @@ -11494,13 +11475,12 @@ static void f_jobpid(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); | ||||
|   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||
|   if (!data) { | ||||
|     EMSG(_(e_invjob)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   Process *proc = (Process *)&data->proc; | ||||
|   Process *proc = (Process *)&data->stream.proc; | ||||
|   rettv->vval.v_number = proc->pid; | ||||
| } | ||||
|  | ||||
| @@ -11521,18 +11501,19 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); | ||||
|   Channel *data = find_channel(argvars[0].vval.v_number); | ||||
|   if (!data) { | ||||
|     EMSG(_(e_invjob)); | ||||
|     EMSG(_(e_invchan)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   if (((Process *)&data->proc)->in->closed) { | ||||
|   Stream *in = channel_instream(data); | ||||
|   if (in->closed) { | ||||
|     EMSG(_("Can't send data to the job: stdin is closed")); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   if (data->rpc) { | ||||
|   if (data->is_rpc) { | ||||
|     EMSG(_("Can't send raw data to rpc channel")); | ||||
|     return; | ||||
|   } | ||||
| @@ -11546,7 +11527,7 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|   WBuffer *buf = wstream_new_buffer(input, input_len, 1, xfree); | ||||
|   rettv->vval.v_number = wstream_write(data->proc.uv.process.in, buf); | ||||
|   rettv->vval.v_number = wstream_write(in, buf); | ||||
| } | ||||
|  | ||||
| // "jobresize(job, width, height)" function | ||||
| @@ -11567,19 +11548,17 @@ static void f_jobresize(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|  | ||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); | ||||
|   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||
|   if (!data) { | ||||
|     EMSG(_(e_invjob)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   if (data->proc.uv.process.type != kProcessTypePty) { | ||||
|     EMSG(_(e_jobnotpty)); | ||||
|   if (data->stream.proc.type != kProcessTypePty) { | ||||
|     EMSG(_(e_channotpty)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   pty_process_resize(&data->proc.pty, argvars[1].vval.v_number, | ||||
|       argvars[2].vval.v_number); | ||||
|   pty_process_resize(&data->stream.pty, argvars[1].vval.v_number, argvars[2].vval.v_number); | ||||
|   rettv->vval.v_number = 1; | ||||
| } | ||||
|  | ||||
| @@ -11697,31 +11676,25 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, | ||||
|   Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, | ||||
|                                           pty, rpc, detach, cwd); | ||||
|   Process *proc = (Process *)&data->proc; | ||||
|  | ||||
|   if (pty) { | ||||
|     PtyProcess *pty = &data->stream.pty; | ||||
|     uint16_t width = (uint16_t)tv_dict_get_number(job_opts, "width"); | ||||
|     if (width > 0) { | ||||
|       data->proc.pty.width = width; | ||||
|       pty->width = width; | ||||
|     } | ||||
|     uint16_t height = (uint16_t)tv_dict_get_number(job_opts, "height"); | ||||
|     if (height > 0) { | ||||
|       data->proc.pty.height = height; | ||||
|       pty->height = height; | ||||
|     } | ||||
|     char *term = tv_dict_get_string(job_opts, "TERM", true); | ||||
|     if (term) { | ||||
|       data->proc.pty.term_name = term; | ||||
|       pty->term_name = term; | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   if (!rpc && on_stdout.type == kCallbackNone) { | ||||
|     proc->out = NULL; | ||||
|   } | ||||
|   if (on_stderr.type == kCallbackNone) { | ||||
|     proc->err = NULL; | ||||
|   } | ||||
|   common_job_start(data, rettv); | ||||
| } | ||||
|  | ||||
| @@ -11742,14 +11715,12 @@ static void f_jobstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|  | ||||
|   TerminalJobData *data = find_job(argvars[0].vval.v_number); | ||||
|   Channel *data = find_job(argvars[0].vval.v_number, true); | ||||
|   if (!data) { | ||||
|     EMSG(_(e_invjob)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   process_stop((Process *)&data->proc); | ||||
|   data->stopped = true; | ||||
|   process_stop((Process *)&data->stream.proc); | ||||
|   rettv->vval.v_number = 1; | ||||
| } | ||||
|  | ||||
| @@ -11778,9 +11749,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   // is used to represent an invalid job id, -2 is for a interrupted job and | ||||
|   // -1 for jobs that were skipped or timed out. | ||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||
|     TerminalJobData *data = NULL; | ||||
|     Channel *data = NULL; | ||||
|     if (arg->li_tv.v_type != VAR_NUMBER | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number))) { | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||
|       tv_list_append_number(rv, -3); | ||||
|     } else { | ||||
|       // append the list item and set the status pointer so we'll collect the | ||||
| @@ -11802,16 +11773,16 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||
|     TerminalJobData *data = NULL; | ||||
|     Channel *data = NULL; | ||||
|     if (remaining == 0) { | ||||
|       // timed out | ||||
|       break; | ||||
|     } | ||||
|     if (arg->li_tv.v_type != VAR_NUMBER | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number))) { | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||
|       continue; | ||||
|     } | ||||
|     int status = process_wait((Process *)&data->proc, remaining, waiting_jobs); | ||||
|     int status = process_wait((Process *)&data->stream.proc, remaining, waiting_jobs); | ||||
|     if (status < 0) { | ||||
|       // interrupted or timed out, skip remaining jobs. | ||||
|       if (status == -2) { | ||||
| @@ -11832,9 +11803,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||
|     TerminalJobData *data = NULL; | ||||
|     Channel *data = NULL; | ||||
|     if (arg->li_tv.v_type != VAR_NUMBER | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number))) { | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||
|       continue; | ||||
|     } | ||||
|     // remove the status pointer because the list may be freed before the | ||||
| @@ -11844,9 +11815,9 @@ static void f_jobwait(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|  | ||||
|   // restore the parent queue for any jobs still alive | ||||
|   for (listitem_T *arg = args->lv_first; arg != NULL; arg = arg->li_next) { | ||||
|     TerminalJobData *data = NULL; | ||||
|     Channel *data = NULL; | ||||
|     if (arg->li_tv.v_type != VAR_NUMBER | ||||
|         || !(data = pmap_get(uint64_t)(jobs, arg->li_tv.vval.v_number))) { | ||||
|         || !(data = find_job(arg->li_tv.vval.v_number, false))) { | ||||
|       continue; | ||||
|     } | ||||
|     // restore the parent queue for the job | ||||
| @@ -13803,9 +13774,8 @@ static void f_rpcnotify(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|     ADD(args, vim_to_object(tv)); | ||||
|   } | ||||
|  | ||||
|   if (!channel_send_event((uint64_t)argvars[0].vval.v_number, | ||||
|                           tv_get_string(&argvars[1]), | ||||
|                           args)) { | ||||
|   if (!rpc_send_event((uint64_t)argvars[0].vval.v_number, | ||||
|                       tv_get_string(&argvars[1]), args)) { | ||||
|     EMSG2(_(e_invarg2), "Channel doesn't exist"); | ||||
|     return; | ||||
|   } | ||||
| @@ -13870,10 +13840,8 @@ static void f_rpcrequest(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|  | ||||
|  | ||||
|   Error err = ERROR_INIT; | ||||
|   Object result = channel_send_call((uint64_t)argvars[0].vval.v_number, | ||||
|                                     tv_get_string(&argvars[1]), | ||||
|                                     args, | ||||
|                                     &err); | ||||
|   Object result = rpc_send_call((uint64_t)argvars[0].vval.v_number, | ||||
|                                 tv_get_string(&argvars[1]), args, &err); | ||||
|  | ||||
|   if (l_provider_call_nesting) { | ||||
|     current_SID = save_current_SID; | ||||
| @@ -13954,7 +13922,7 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   // The last item of argv must be NULL | ||||
|   argv[i] = NULL; | ||||
|  | ||||
|   TerminalJobData *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, | ||||
|   Channel *data = common_job_init(argv, CALLBACK_NONE, CALLBACK_NONE, | ||||
|                                           CALLBACK_NONE, false, true, false, | ||||
|                                           NULL); | ||||
|   common_job_start(data, rettv); | ||||
| @@ -13977,10 +13945,11 @@ static void f_rpcstop(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|   // if called with a job, stop it, else closes the channel | ||||
|   if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) { | ||||
|   uint64_t id = argvars[0].vval.v_number; | ||||
|   if (find_job(id, false)) { // FIXME | ||||
|     f_jobstop(argvars, rettv, NULL); | ||||
|   } else { | ||||
|     rettv->vval.v_number = channel_close(argvars[0].vval.v_number); | ||||
|     rettv->vval.v_number = channel_close(id); | ||||
|   } | ||||
| } | ||||
|  | ||||
| @@ -16689,11 +16658,11 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   } | ||||
|  | ||||
|   uint16_t term_width = MAX(0, curwin->w_width - win_col_off(curwin)); | ||||
|   TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit, | ||||
|   Channel *data = common_job_init(argv, on_stdout, on_stderr, on_exit, | ||||
|                                           true, false, false, cwd); | ||||
|   data->proc.pty.width = term_width; | ||||
|   data->proc.pty.height = curwin->w_height; | ||||
|   data->proc.pty.term_name = xstrdup("xterm-256color"); | ||||
|   data->stream.pty.width = term_width; | ||||
|   data->stream.pty.height = curwin->w_height; | ||||
|   data->stream.pty.term_name = xstrdup("xterm-256color"); | ||||
|   if (!common_job_start(data, rettv)) { | ||||
|     return; | ||||
|   } | ||||
| @@ -16705,7 +16674,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|   topts.resize_cb = term_resize; | ||||
|   topts.close_cb = term_close; | ||||
|  | ||||
|   int pid = data->proc.pty.process.pid; | ||||
|   int pid = data->stream.pty.process.pid; | ||||
|  | ||||
|   char buf[1024]; | ||||
|   // format the title with the pid to conform with the term:// URI | ||||
| @@ -16725,7 +16694,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv, FunPtr fptr) | ||||
|  | ||||
|   Terminal *term = terminal_open(topts); | ||||
|   data->term = term; | ||||
|   data->refcount++; | ||||
|   channel_incref(data); | ||||
|  | ||||
|   return; | ||||
| } | ||||
| @@ -16760,30 +16729,6 @@ bool callback_from_typval(Callback *const callback, typval_T *const arg) | ||||
|   return true; | ||||
| } | ||||
|  | ||||
| /// Unref/free callback | ||||
| void callback_free(Callback *const callback) | ||||
|   FUNC_ATTR_NONNULL_ALL | ||||
| { | ||||
|   switch (callback->type) { | ||||
|     case kCallbackFuncref: { | ||||
|       func_unref(callback->data.funcref); | ||||
|       xfree(callback->data.funcref); | ||||
|       break; | ||||
|     } | ||||
|     case kCallbackPartial: { | ||||
|       partial_unref(callback->data.partial); | ||||
|       break; | ||||
|     } | ||||
|     case kCallbackNone: { | ||||
|       break; | ||||
|     } | ||||
|     default: { | ||||
|       abort(); | ||||
|     } | ||||
|   } | ||||
|   callback->type = kCallbackNone; | ||||
| } | ||||
|  | ||||
| bool callback_call(Callback *const callback, const int argcount_in, | ||||
|                    typval_T *const argvars_in, typval_T *const rettv) | ||||
|   FUNC_ATTR_NONNULL_ALL | ||||
| @@ -22402,7 +22347,7 @@ char_u *do_string_sub(char_u *str, char_u *pat, char_u *sub, | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| static inline TerminalJobData *common_job_init(char **argv, | ||||
| static inline Channel *common_job_init(char **argv, | ||||
|                                                Callback on_stdout, | ||||
|                                                Callback on_stderr, | ||||
|                                                Callback on_exit, | ||||
| @@ -22411,25 +22356,18 @@ static inline TerminalJobData *common_job_init(char **argv, | ||||
|                                                bool detach, | ||||
|                                                const char *cwd) | ||||
| { | ||||
|   TerminalJobData *data = xcalloc(1, sizeof(TerminalJobData)); | ||||
|   data->stopped = false; | ||||
|   Channel *data = channel_alloc(kChannelStreamProc); | ||||
|   data->on_stdout = on_stdout; | ||||
|   data->on_stderr = on_stderr; | ||||
|   data->on_exit = on_exit; | ||||
|   data->events = multiqueue_new_child(main_loop.events); | ||||
|   data->rpc = rpc; | ||||
|   data->is_rpc = rpc; | ||||
|   if (pty) { | ||||
|     data->proc.pty = pty_process_init(&main_loop, data); | ||||
|     data->stream.pty = pty_process_init(&main_loop, data); | ||||
|   } else { | ||||
|     data->proc.uv = libuv_process_init(&main_loop, data); | ||||
|     data->stream.uv = libuv_process_init(&main_loop, data); | ||||
|   } | ||||
|   Process *proc = (Process *)&data->proc; | ||||
|   Process *proc = (Process *)&data->stream.proc; | ||||
|   proc->argv = argv; | ||||
|   proc->in = &data->in; | ||||
|   proc->out = &data->out; | ||||
|   if (!pty) { | ||||
|     proc->err = &data->err; | ||||
|   } | ||||
|   proc->cb = eval_job_process_exit_cb; | ||||
|   proc->events = data->events; | ||||
|   proc->detach = detach; | ||||
| @@ -22456,80 +22394,66 @@ static inline bool common_job_callbacks(dict_T *vopts, Callback *on_stdout, | ||||
|   return false; | ||||
| } | ||||
|  | ||||
| static inline bool common_job_start(TerminalJobData *data, typval_T *rettv) | ||||
| static inline bool common_job_start(Channel *data, typval_T *rettv) | ||||
| { | ||||
|   Process *proc = (Process *)&data->proc; | ||||
|   Process *proc = (Process *)&data->stream.proc; | ||||
|   if (proc->type == kProcessTypePty && proc->detach) { | ||||
|     EMSG2(_(e_invarg2), "terminal/pty job cannot be detached"); | ||||
|     xfree(data->proc.pty.term_name); | ||||
|     xfree(data->stream.pty.term_name); | ||||
|     shell_free_argv(proc->argv); | ||||
|     free_term_job_data_event((void **)&data); | ||||
|     channel_decref(data); | ||||
|     return false; | ||||
|   } | ||||
|  | ||||
|   data->id = next_chan_id++; | ||||
|   pmap_put(uint64_t)(jobs, data->id, data); | ||||
|  | ||||
|   data->refcount++; | ||||
|   char *cmd = xstrdup(proc->argv[0]); | ||||
|   int status = process_spawn(proc); | ||||
|   bool has_out, has_err; | ||||
|   if (proc->type == kProcessTypePty) { | ||||
|     has_out = true; | ||||
|     has_err = false; | ||||
|   } else { | ||||
|     has_out = data->is_rpc || data->on_stdout.type != kCallbackNone; | ||||
|     has_err = data->on_stderr.type != kCallbackNone; | ||||
|   } | ||||
|   int status = process_spawn(proc, true, has_out, has_err); | ||||
|   if (status) { | ||||
|     EMSG3(_(e_jobspawn), os_strerror(status), cmd); | ||||
|     xfree(cmd); | ||||
|     if (proc->type == kProcessTypePty) { | ||||
|       xfree(data->proc.pty.term_name); | ||||
|       xfree(data->stream.pty.term_name); | ||||
|     } | ||||
|     rettv->vval.v_number = proc->status; | ||||
|     term_job_data_decref(data); | ||||
|     channel_decref(data); | ||||
|     return false; | ||||
|   } | ||||
|   xfree(cmd); | ||||
|  | ||||
|  | ||||
|   if (data->rpc) { | ||||
|     // the rpc channel takes over the in and out streams | ||||
|     channel_from_process(proc, data->id); | ||||
|   if (data->is_rpc) { | ||||
|     // the rpc takes over the in and out streams | ||||
|     rpc_start(data); | ||||
|   } else { | ||||
|     wstream_init(proc->in, 0); | ||||
|     if (proc->out) { | ||||
|       rstream_init(proc->out, 0); | ||||
|       rstream_start(proc->out, on_job_stdout, data); | ||||
|     wstream_init(&proc->in, 0); | ||||
|     if (has_out) { | ||||
|       rstream_init(&proc->out, 0); | ||||
|       rstream_start(&proc->out, on_job_stdout, data); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   if (proc->err) { | ||||
|     rstream_init(proc->err, 0); | ||||
|     rstream_start(proc->err, on_job_stderr, data); | ||||
|   if (has_err) { | ||||
|     rstream_init(&proc->err, 0); | ||||
|     rstream_start(&proc->err, on_job_stderr, data); | ||||
|   } | ||||
|   rettv->vval.v_number = data->id; | ||||
|   return true; | ||||
| } | ||||
|  | ||||
| static inline void free_term_job_data_event(void **argv) | ||||
| { | ||||
|   TerminalJobData *data = argv[0]; | ||||
|   callback_free(&data->on_stdout); | ||||
|   callback_free(&data->on_stderr); | ||||
|   callback_free(&data->on_exit); | ||||
|  | ||||
|   multiqueue_free(data->events); | ||||
|   pmap_del(uint64_t)(jobs, data->id); | ||||
|   xfree(data); | ||||
| } | ||||
|  | ||||
| static inline void free_term_job_data(TerminalJobData *data) | ||||
| { | ||||
|   // data->queue may still be used after this function returns(process_wait), so | ||||
|   // only free in the next event loop iteration | ||||
|   multiqueue_put(main_loop.fast_events, free_term_job_data_event, 1, data); | ||||
| } | ||||
|  | ||||
| // vimscript job callbacks must be executed on Nvim main loop | ||||
| static inline void process_job_event(TerminalJobData *data, Callback *callback, | ||||
| static inline void process_job_event(Channel *data, Callback *callback, | ||||
|                                      const char *type, char *buf, size_t count, | ||||
|                                      int status) | ||||
| { | ||||
|   JobEvent event_data; | ||||
|   ChannelEvent event_data; | ||||
|   event_data.received = NULL; | ||||
|   if (buf) { | ||||
|     event_data.received = tv_list_alloc(); | ||||
| @@ -22566,18 +22490,18 @@ static inline void process_job_event(TerminalJobData *data, Callback *callback, | ||||
| static void on_job_stdout(Stream *stream, RBuffer *buf, size_t count, | ||||
|     void *job, bool eof) | ||||
| { | ||||
|   TerminalJobData *data = job; | ||||
|   Channel *data = job; | ||||
|   on_job_output(stream, job, buf, count, eof, &data->on_stdout, "stdout"); | ||||
| } | ||||
|  | ||||
| static void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, | ||||
|     void *job, bool eof) | ||||
| { | ||||
|   TerminalJobData *data = job; | ||||
|   Channel *data = job; | ||||
|   on_job_output(stream, job, buf, count, eof, &data->on_stderr, "stderr"); | ||||
| } | ||||
|  | ||||
| static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, | ||||
| static void on_job_output(Stream *stream, Channel *data, RBuffer *buf, | ||||
|                           size_t count, bool eof, Callback *callback, | ||||
|                           const char *type) | ||||
| { | ||||
| @@ -22604,14 +22528,14 @@ static void on_job_output(Stream *stream, TerminalJobData *data, RBuffer *buf, | ||||
|  | ||||
| static void eval_job_process_exit_cb(Process *proc, int status, void *d) | ||||
| { | ||||
|   TerminalJobData *data = d; | ||||
|   if (data->term && !data->exited) { | ||||
|     data->exited = true; | ||||
|   Channel *data = d; | ||||
|   if (data->term && !data->stream.proc.exited) { | ||||
|     data->stream.proc.exited = true; | ||||
|     char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN]; | ||||
|     snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status); | ||||
|     terminal_close(data->term, msg); | ||||
|   } | ||||
|   if (data->rpc) { | ||||
|   if (data->is_rpc) { | ||||
|     channel_process_exit(data->id, status); | ||||
|   } | ||||
|  | ||||
| @@ -22621,58 +22545,51 @@ static void eval_job_process_exit_cb(Process *proc, int status, void *d) | ||||
|  | ||||
|   process_job_event(data, &data->on_exit, "exit", NULL, 0, status); | ||||
|  | ||||
|   term_job_data_decref(data); | ||||
|   channel_decref(data); | ||||
| } | ||||
|  | ||||
| static void term_write(char *buf, size_t size, void *d) | ||||
| { | ||||
|   TerminalJobData *job = d; | ||||
|   if (job->in.closed) { | ||||
|   Channel *job = d; | ||||
|   if (job->stream.proc.in.closed) { | ||||
|     // If the backing stream was closed abruptly, there may be write events | ||||
|     // ahead of the terminal close event. Just ignore the writes. | ||||
|     ILOG("write failed: stream is closed"); | ||||
|     return; | ||||
|   } | ||||
|   WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); | ||||
|   wstream_write(&job->in, wbuf); | ||||
|   wstream_write(&job->stream.proc.in, wbuf); | ||||
| } | ||||
|  | ||||
| static void term_resize(uint16_t width, uint16_t height, void *d) | ||||
| { | ||||
|   TerminalJobData *data = d; | ||||
|   pty_process_resize(&data->proc.pty, width, height); | ||||
|   Channel *data = d; | ||||
|   pty_process_resize(&data->stream.pty, width, height); | ||||
| } | ||||
|  | ||||
| static inline void term_delayed_free(void **argv) | ||||
| { | ||||
|   TerminalJobData *j = argv[0]; | ||||
|   if (j->in.pending_reqs || j->out.pending_reqs || j->err.pending_reqs) { | ||||
|   Channel *j = argv[0]; | ||||
|   if (j->stream.proc.in.pending_reqs || j->stream.proc.out.pending_reqs) { | ||||
|     multiqueue_put(j->events, term_delayed_free, 1, j); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   terminal_destroy(j->term); | ||||
|   term_job_data_decref(j); | ||||
|   channel_decref(j); | ||||
| } | ||||
|  | ||||
| static void term_close(void *d) | ||||
| { | ||||
|   TerminalJobData *data = d; | ||||
|   if (!data->exited) { | ||||
|     data->exited = true; | ||||
|     process_stop((Process *)&data->proc); | ||||
|   Channel *data = d; | ||||
|   if (!data->stream.proc.exited) { | ||||
|     data->stream.proc.exited = true; | ||||
|     process_stop((Process *)&data->stream.proc); | ||||
|   } | ||||
|   multiqueue_put(data->events, term_delayed_free, 1, data); | ||||
| } | ||||
|  | ||||
| static void term_job_data_decref(TerminalJobData *data) | ||||
| { | ||||
|   if (!(--data->refcount)) { | ||||
|     free_term_job_data(data); | ||||
|   } | ||||
| } | ||||
|  | ||||
| static void on_job_event(JobEvent *ev) | ||||
| static void on_job_event(ChannelEvent *ev) | ||||
| { | ||||
|   if (!ev->callback) { | ||||
|     return; | ||||
| @@ -22704,15 +22621,24 @@ static void on_job_event(JobEvent *ev) | ||||
|   tv_clear(&rettv); | ||||
| } | ||||
|  | ||||
| static TerminalJobData *find_job(uint64_t id) | ||||
| static Channel *find_job(uint64_t id, bool show_error) | ||||
| { | ||||
|   TerminalJobData *data = pmap_get(uint64_t)(jobs, id); | ||||
|   if (!data || data->stopped) { | ||||
|   Channel *data = find_channel(id); | ||||
|   if (!data || data->streamtype != kChannelStreamProc | ||||
|       || process_is_stopped(&data->stream.proc)) { | ||||
|     if (show_error) { | ||||
|       if (data && data->streamtype != kChannelStreamProc) { | ||||
|         EMSG(_(e_invchanjob)); | ||||
|       } else { | ||||
|         EMSG(_(e_invchan)); | ||||
|       } | ||||
|     } | ||||
|     return NULL; | ||||
|   } | ||||
|   return data; | ||||
| } | ||||
|  | ||||
|  | ||||
| static void script_host_eval(char *name, typval_T *argvars, typval_T *rettv) | ||||
| { | ||||
|   if (check_restricted() || check_secure()) { | ||||
|   | ||||
| @@ -847,6 +847,30 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2) | ||||
|   return false; | ||||
| } | ||||
|  | ||||
| /// Unref/free callback | ||||
| void callback_free(Callback *const callback) | ||||
|   FUNC_ATTR_NONNULL_ALL | ||||
| { | ||||
|   switch (callback->type) { | ||||
|     case kCallbackFuncref: { | ||||
|       func_unref(callback->data.funcref); | ||||
|       xfree(callback->data.funcref); | ||||
|       break; | ||||
|     } | ||||
|     case kCallbackPartial: { | ||||
|       partial_unref(callback->data.partial); | ||||
|       break; | ||||
|     } | ||||
|     case kCallbackNone: { | ||||
|       break; | ||||
|     } | ||||
|     default: { | ||||
|       abort(); | ||||
|     } | ||||
|   } | ||||
|   callback->type = kCallbackNone; | ||||
| } | ||||
|  | ||||
| /// Remove watcher from a dictionary | ||||
| /// | ||||
| /// @param  dict  Dictionary to remove watcher from. | ||||
|   | ||||
| @@ -46,22 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc) | ||||
|   uvproc->uvstdio[2].flags = UV_IGNORE; | ||||
|   uvproc->uv.data = proc; | ||||
|  | ||||
|   if (proc->in) { | ||||
|   if (!proc->in.closed) { | ||||
|     uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; | ||||
|     uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t, | ||||
|                                                  &proc->in->uv.pipe); | ||||
|                                                  &proc->in.uv.pipe); | ||||
|   } | ||||
|  | ||||
|   if (proc->out) { | ||||
|   if (!proc->out.closed) { | ||||
|     uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; | ||||
|     uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t, | ||||
|                                                  &proc->out->uv.pipe); | ||||
|                                                  &proc->out.uv.pipe); | ||||
|   } | ||||
|  | ||||
|   if (proc->err) { | ||||
|   if (!proc->err.closed) { | ||||
|     uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE; | ||||
|     uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t, | ||||
|                                                  &proc->err->uv.pipe); | ||||
|                                                  &proc->err.uv.pipe); | ||||
|   } | ||||
|  | ||||
|   int status; | ||||
|   | ||||
| @@ -27,26 +27,33 @@ | ||||
|  | ||||
| #define CLOSE_PROC_STREAM(proc, stream) \ | ||||
|   do { \ | ||||
|     if (proc->stream && !proc->stream->closed) { \ | ||||
|       stream_close(proc->stream, NULL, NULL); \ | ||||
|     if (!proc->stream.closed) { \ | ||||
|       stream_close(&proc->stream, NULL, NULL); \ | ||||
|     } \ | ||||
|   } while (0) | ||||
|  | ||||
| static bool process_is_tearing_down = false; | ||||
|  | ||||
| /// @returns zero on success, or negative error code | ||||
| int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL | ||||
| int process_spawn(Process *proc, bool in, bool out, bool err) | ||||
|   FUNC_ATTR_NONNULL_ALL | ||||
| { | ||||
|   if (proc->in) { | ||||
|     uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0); | ||||
|   if (in) { | ||||
|     uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0); | ||||
|   } else { | ||||
|     proc->in.closed = true; | ||||
|   } | ||||
|  | ||||
|   if (proc->out) { | ||||
|     uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0); | ||||
|   if (out) { | ||||
|     uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0); | ||||
|   } else { | ||||
|     proc->out.closed = true; | ||||
|   } | ||||
|  | ||||
|   if (proc->err) { | ||||
|     uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0); | ||||
|   if (err) { | ||||
|     uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0); | ||||
|   } else { | ||||
|     proc->err.closed = true; | ||||
|   } | ||||
|  | ||||
|   int status; | ||||
| @@ -62,14 +69,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL | ||||
|   } | ||||
|  | ||||
|   if (status) { | ||||
|     if (proc->in) { | ||||
|       uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL); | ||||
|     if (in) { | ||||
|       uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL); | ||||
|     } | ||||
|     if (proc->out) { | ||||
|       uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL); | ||||
|     if (out) { | ||||
|       uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL); | ||||
|     } | ||||
|     if (proc->err) { | ||||
|       uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL); | ||||
|     if (err) { | ||||
|       uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL); | ||||
|     } | ||||
|  | ||||
|     if (proc->type == kProcessTypeUv) { | ||||
| @@ -82,30 +89,30 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL | ||||
|     return status; | ||||
|   } | ||||
|  | ||||
|   if (proc->in) { | ||||
|     stream_init(NULL, proc->in, -1, | ||||
|                 STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe)); | ||||
|     proc->in->events = proc->events; | ||||
|     proc->in->internal_data = proc; | ||||
|     proc->in->internal_close_cb = on_process_stream_close; | ||||
|   if (in) { | ||||
|     stream_init(NULL, &proc->in, -1, | ||||
|                 STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe)); | ||||
|     proc->in.events = proc->events; | ||||
|     proc->in.internal_data = proc; | ||||
|     proc->in.internal_close_cb = on_process_stream_close; | ||||
|     proc->refcount++; | ||||
|   } | ||||
|  | ||||
|   if (proc->out) { | ||||
|     stream_init(NULL, proc->out, -1, | ||||
|                 STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe)); | ||||
|     proc->out->events = proc->events; | ||||
|     proc->out->internal_data = proc; | ||||
|     proc->out->internal_close_cb = on_process_stream_close; | ||||
|   if (out) { | ||||
|     stream_init(NULL, &proc->out, -1, | ||||
|                 STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe)); | ||||
|     proc->out.events = proc->events; | ||||
|     proc->out.internal_data = proc; | ||||
|     proc->out.internal_close_cb = on_process_stream_close; | ||||
|     proc->refcount++; | ||||
|   } | ||||
|  | ||||
|   if (proc->err) { | ||||
|     stream_init(NULL, proc->err, -1, | ||||
|                 STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe)); | ||||
|     proc->err->events = proc->events; | ||||
|     proc->err->internal_data = proc; | ||||
|     proc->err->internal_close_cb = on_process_stream_close; | ||||
|   if (err) { | ||||
|     stream_init(NULL, &proc->err, -1, | ||||
|                 STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe)); | ||||
|     proc->err.events = proc->events; | ||||
|     proc->err.internal_data = proc; | ||||
|     proc->err.internal_close_cb = on_process_stream_close; | ||||
|     proc->refcount++; | ||||
|   } | ||||
|  | ||||
| @@ -395,8 +402,8 @@ static void process_close_handles(void **argv) | ||||
| { | ||||
|   Process *proc = argv[0]; | ||||
|  | ||||
|   flush_stream(proc, proc->out); | ||||
|   flush_stream(proc, proc->err); | ||||
|   flush_stream(proc, &proc->out); | ||||
|   flush_stream(proc, &proc->err); | ||||
|  | ||||
|   process_close_streams(proc); | ||||
|   process_close(proc); | ||||
|   | ||||
| @@ -23,13 +23,15 @@ struct process { | ||||
|   uint64_t stopped_time; | ||||
|   const char *cwd; | ||||
|   char **argv; | ||||
|   Stream *in, *out, *err; | ||||
|   Stream in, out, err; | ||||
|   process_exit_cb cb; | ||||
|   internal_process_cb internal_exit_cb, internal_close_cb; | ||||
|   bool exited; // TODO: redundant | ||||
|   bool closed, detach; | ||||
|   MultiQueue *events; | ||||
| }; | ||||
|  | ||||
|  | ||||
| static inline Process process_init(Loop *loop, ProcessType type, void *data) | ||||
| { | ||||
|   return (Process) { | ||||
| @@ -43,9 +45,9 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) | ||||
|     .stopped_time = 0, | ||||
|     .cwd = NULL, | ||||
|     .argv = NULL, | ||||
|     .in = NULL, | ||||
|     .out = NULL, | ||||
|     .err = NULL, | ||||
|     .in = { .closed = false }, | ||||
|     .out = { .closed = false }, | ||||
|     .err = { .closed = false }, | ||||
|     .cb = NULL, | ||||
|     .closed = false, | ||||
|     .internal_close_cb = NULL, | ||||
| @@ -54,6 +56,11 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data) | ||||
|   }; | ||||
| } | ||||
|  | ||||
| static inline bool process_is_stopped(Process *proc) | ||||
| { | ||||
|   return proc->stopped_time != 0; | ||||
| } | ||||
|  | ||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||
| # include "event/process.h.generated.h" | ||||
| #endif | ||||
|   | ||||
| @@ -33,6 +33,7 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status); | ||||
| typedef void (*stream_close_cb)(Stream *stream, void *data); | ||||
|  | ||||
| struct stream { | ||||
|   bool closed; | ||||
|   union { | ||||
|     uv_pipe_t pipe; | ||||
|     uv_tcp_t tcp; | ||||
| @@ -52,7 +53,6 @@ struct stream { | ||||
|   size_t maxmem; | ||||
|   size_t pending_reqs; | ||||
|   size_t num_bytes; | ||||
|   bool closed; | ||||
|   MultiQueue *events; | ||||
| }; | ||||
|  | ||||
|   | ||||
| @@ -1074,11 +1074,12 @@ EXTERN char_u e_invexpr2[] INIT(= N_("E15: Invalid expression: %s")); | ||||
| EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range")); | ||||
| EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command")); | ||||
| EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory")); | ||||
| EXTERN char_u e_invjob[] INIT(= N_("E900: Invalid job id")); | ||||
| EXTERN char_u e_invchan[] INIT(= N_("E900: Invalid channel id")); | ||||
| EXTERN char_u e_invchanjob[] INIT(= N_("E900: Invalid channel id: not a job")); | ||||
| EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full")); | ||||
| EXTERN char_u e_jobspawn[] INIT(= N_( | ||||
|         "E903: Process failed to start: %s: \"%s\"")); | ||||
| EXTERN char_u e_jobnotpty[] INIT(= N_("E904: Job is not connected to a pty")); | ||||
|     "E903: Process failed to start: %s: \"%s\"")); | ||||
| EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty")); | ||||
| EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\"")); | ||||
| EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s")); | ||||
| EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number")); | ||||
|   | ||||
| @@ -11,6 +11,7 @@ | ||||
| #include "nvim/api/private/helpers.h" | ||||
| #include "nvim/api/vim.h" | ||||
| #include "nvim/api/ui.h" | ||||
| #include "nvim/channel.h" | ||||
| #include "nvim/msgpack_rpc/channel.h" | ||||
| #include "nvim/msgpack_rpc/server.h" | ||||
| #include "nvim/event/loop.h" | ||||
| @@ -40,47 +41,6 @@ | ||||
| #define log_server_msg(...) | ||||
| #endif | ||||
|  | ||||
| typedef enum { | ||||
|   kChannelTypeSocket, | ||||
|   kChannelTypeProc, | ||||
|   kChannelTypeStdio, | ||||
|   kChannelTypeInternal | ||||
| } ChannelType; | ||||
|  | ||||
| typedef struct { | ||||
|   uint64_t request_id; | ||||
|   bool returned, errored; | ||||
|   Object result; | ||||
| } ChannelCallFrame; | ||||
|  | ||||
| typedef struct { | ||||
|   uint64_t id; | ||||
|   size_t refcount; | ||||
|   PMap(cstr_t) *subscribed_events; | ||||
|   bool closed; | ||||
|   ChannelType type; | ||||
|   msgpack_unpacker *unpacker; | ||||
|   union { | ||||
|     Stream stream;  // bidirectional (socket) | ||||
|     Process *proc; | ||||
|     struct { | ||||
|       Stream in; | ||||
|       Stream out; | ||||
|     } std; | ||||
|   } data; | ||||
|   uint64_t next_request_id; | ||||
|   kvec_t(ChannelCallFrame *) call_stack; | ||||
|   MultiQueue *events; | ||||
| } Channel; | ||||
|  | ||||
| typedef struct { | ||||
|   Channel *channel; | ||||
|   MsgpackRpcRequestHandler handler; | ||||
|   Array args; | ||||
|   uint64_t request_id; | ||||
| } RequestEvent; | ||||
|  | ||||
| static PMap(uint64_t) *channels = NULL; | ||||
| static PMap(cstr_t) *event_strings = NULL; | ||||
| static msgpack_sbuffer out_buffer; | ||||
|  | ||||
| @@ -88,50 +48,32 @@ static msgpack_sbuffer out_buffer; | ||||
| # include "msgpack_rpc/channel.c.generated.h" | ||||
| #endif | ||||
|  | ||||
| /// Initializes the module | ||||
| void channel_init(void) | ||||
| void rpc_init(void) | ||||
| { | ||||
|   ch_before_blocking_events = multiqueue_new_child(main_loop.events); | ||||
|   channels = pmap_new(uint64_t)(); | ||||
|   event_strings = pmap_new(cstr_t)(); | ||||
|   msgpack_sbuffer_init(&out_buffer); | ||||
|   remote_ui_init(); | ||||
| } | ||||
|  | ||||
| /// Teardown the module | ||||
| void channel_teardown(void) | ||||
|  | ||||
| void rpc_start(Channel *channel) | ||||
| { | ||||
|   if (!channels) { | ||||
|     return; | ||||
|   } | ||||
|   channel->is_rpc = true; | ||||
|   RpcState *rpc = &channel->rpc; | ||||
|   rpc->closed = false; | ||||
|   rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); | ||||
|   rpc->subscribed_events = pmap_new(cstr_t)(); | ||||
|   rpc->next_request_id = 1; | ||||
|   kv_init(rpc->call_stack); | ||||
|  | ||||
|   Channel *channel; | ||||
|   Stream *in = channel_instream(channel); | ||||
|   Stream *out = channel_outstream(channel); | ||||
|  | ||||
|   map_foreach_value(channels, channel, { | ||||
|     close_channel(channel); | ||||
|   }); | ||||
| } | ||||
|   DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out); | ||||
|  | ||||
| /// Creates an API channel by starting a process and connecting to its | ||||
| /// stdin/stdout. stderr is handled by the job infrastructure. | ||||
| /// | ||||
| /// @param argv The argument vector for the process. [consumed] | ||||
| /// @return The channel id (> 0), on success. | ||||
| ///         0, on error. | ||||
| uint64_t channel_from_process(Process *proc, uint64_t id) | ||||
| { | ||||
|   Channel *channel = register_channel(kChannelTypeProc, id, proc->events); | ||||
|   incref(channel);  // process channels are only closed by the exit_cb | ||||
|   channel->data.proc = proc; | ||||
|  | ||||
|   wstream_init(proc->in, 0); | ||||
|   rstream_init(proc->out, 0); | ||||
|   rstream_start(proc->out, receive_msgpack, channel); | ||||
|  | ||||
|   DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in, | ||||
|        proc->out); | ||||
|  | ||||
|   return channel->id; | ||||
|   wstream_init(in, 0); | ||||
|   rstream_init(out, CHANNEL_BUFFER_SIZE); | ||||
|   rstream_start(out, receive_msgpack, channel); | ||||
| } | ||||
|  | ||||
| /// Creates an API channel from a tcp/pipe socket connection | ||||
| @@ -139,19 +81,15 @@ uint64_t channel_from_process(Process *proc, uint64_t id) | ||||
| /// @param watcher The SocketWatcher ready to accept the connection | ||||
| void channel_from_connection(SocketWatcher *watcher) | ||||
| { | ||||
|   Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); | ||||
|   socket_watcher_accept(watcher, &channel->data.stream); | ||||
|   incref(channel);  // close channel only after the stream is closed | ||||
|   channel->data.stream.internal_close_cb = close_cb; | ||||
|   channel->data.stream.internal_data = channel; | ||||
|   wstream_init(&channel->data.stream, 0); | ||||
|   rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); | ||||
|   rstream_start(&channel->data.stream, receive_msgpack, channel); | ||||
|  | ||||
|   DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id, | ||||
|        &channel->data.stream); | ||||
|   Channel *channel = channel_alloc(kChannelStreamSocket); | ||||
|   socket_watcher_accept(watcher, &channel->stream.socket); | ||||
|   channel_incref(channel);  // close channel only after the stream is closed | ||||
|   channel->stream.socket.internal_close_cb = close_cb; | ||||
|   channel->stream.socket.internal_data = channel; | ||||
|   rpc_start(channel); | ||||
| } | ||||
|  | ||||
| /// TODO: move to eval.c, also support bytes | ||||
| uint64_t channel_connect(bool tcp, const char *address, | ||||
|                          int timeout, const char **error) | ||||
| { | ||||
| @@ -165,34 +103,40 @@ uint64_t channel_connect(bool tcp, const char *address, | ||||
|     xfree(path); | ||||
|   } | ||||
|  | ||||
|   Channel *channel = register_channel(kChannelTypeSocket, 0, NULL); | ||||
|   if (!socket_connect(&main_loop, &channel->data.stream, | ||||
|   Channel *channel = channel_alloc(kChannelStreamSocket); | ||||
|   if (!socket_connect(&main_loop, &channel->stream.socket, | ||||
|                       tcp, address, timeout, error)) { | ||||
|     decref(channel); | ||||
|     channel_decref(channel); | ||||
|     return 0; | ||||
|   } | ||||
|  | ||||
|   incref(channel);  // close channel only after the stream is closed | ||||
|   channel->data.stream.internal_close_cb = close_cb; | ||||
|   channel->data.stream.internal_data = channel; | ||||
|   wstream_init(&channel->data.stream, 0); | ||||
|   rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE); | ||||
|   rstream_start(&channel->data.stream, receive_msgpack, channel); | ||||
|   channel_incref(channel);  // close channel only after the stream is closed | ||||
|   channel->stream.socket.internal_close_cb = close_cb; | ||||
|   channel->stream.socket.internal_data = channel; | ||||
|   rpc_start(channel); | ||||
|   return channel->id; | ||||
| } | ||||
|  | ||||
| static Channel *find_rpc_channel(uint64_t id) | ||||
| { | ||||
|   Channel *chan = find_channel(id); | ||||
|   if (!chan || !chan->is_rpc || chan->rpc.closed) { | ||||
|     return NULL; | ||||
|   } | ||||
|   return chan; | ||||
| } | ||||
|  | ||||
| /// Publishes an event to a channel. | ||||
| /// | ||||
| /// @param id Channel id. 0 means "broadcast to all subscribed channels" | ||||
| /// @param name Event name (application-defined) | ||||
| /// @param args Array of event arguments | ||||
| /// @return True if the event was sent successfully, false otherwise. | ||||
| bool channel_send_event(uint64_t id, const char *name, Array args) | ||||
| bool rpc_send_event(uint64_t id, const char *name, Array args) | ||||
| { | ||||
|   Channel *channel = NULL; | ||||
|  | ||||
|   if (id && (!(channel = pmap_get(uint64_t)(channels, id)) | ||||
|             || channel->closed)) { | ||||
|   if (id && (!(channel = find_rpc_channel(id)))) { | ||||
|     api_free_array(args); | ||||
|     return false; | ||||
|   } | ||||
| @@ -213,29 +157,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args) | ||||
| /// @param args Array with method arguments | ||||
| /// @param[out] error True if the return value is an error | ||||
| /// @return Whatever the remote method returned | ||||
| Object channel_send_call(uint64_t id, | ||||
|                          const char *method_name, | ||||
|                          Array args, | ||||
|                          Error *err) | ||||
| Object rpc_send_call(uint64_t id, | ||||
|                      const char *method_name, | ||||
|                      Array args, | ||||
|                      Error *err) | ||||
| { | ||||
|   Channel *channel = NULL; | ||||
|  | ||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { | ||||
|   if (!(channel = find_rpc_channel(id))) { | ||||
|     api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); | ||||
|     api_free_array(args); | ||||
|     return NIL; | ||||
|   } | ||||
|  | ||||
|   incref(channel); | ||||
|   uint64_t request_id = channel->next_request_id++; | ||||
|   channel_incref(channel); | ||||
|   RpcState *rpc = &channel->rpc; | ||||
|   uint64_t request_id = rpc->next_request_id++; | ||||
|   // Send the msgpack-rpc request | ||||
|   send_request(channel, request_id, method_name, args); | ||||
|  | ||||
|   // Push the frame | ||||
|   ChannelCallFrame frame = { request_id, false, false, NIL }; | ||||
|   kv_push(channel->call_stack, &frame); | ||||
|   kv_push(rpc->call_stack, &frame); | ||||
|   LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); | ||||
|   (void)kv_pop(channel->call_stack); | ||||
|   (void)kv_pop(rpc->call_stack); | ||||
|  | ||||
|   if (frame.errored) { | ||||
|     if (frame.result.type == kObjectTypeString) { | ||||
| @@ -260,7 +205,7 @@ Object channel_send_call(uint64_t id, | ||||
|     api_free_object(frame.result); | ||||
|   } | ||||
|  | ||||
|   decref(channel); | ||||
|   channel_decref(channel); | ||||
|  | ||||
|   return frame.errored ? NIL : frame.result; | ||||
| } | ||||
| @@ -269,11 +214,11 @@ Object channel_send_call(uint64_t id, | ||||
| /// | ||||
| /// @param id The channel id | ||||
| /// @param event The event type string | ||||
| void channel_subscribe(uint64_t id, char *event) | ||||
| void rpc_subscribe(uint64_t id, char *event) | ||||
| { | ||||
|   Channel *channel; | ||||
|  | ||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { | ||||
|   if (!(channel = find_rpc_channel(id))) { | ||||
|     abort(); | ||||
|   } | ||||
|  | ||||
| @@ -284,18 +229,18 @@ void channel_subscribe(uint64_t id, char *event) | ||||
|     pmap_put(cstr_t)(event_strings, event_string, event_string); | ||||
|   } | ||||
|  | ||||
|   pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string); | ||||
|   pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); | ||||
| } | ||||
|  | ||||
| /// Unsubscribes to event broadcasts | ||||
| /// | ||||
| /// @param id The channel id | ||||
| /// @param event The event type string | ||||
| void channel_unsubscribe(uint64_t id, char *event) | ||||
| void rpc_unsubscribe(uint64_t id, char *event) | ||||
| { | ||||
|   Channel *channel; | ||||
|  | ||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { | ||||
|   if (!(channel = find_rpc_channel(id))) { | ||||
|     abort(); | ||||
|   } | ||||
|  | ||||
| @@ -310,7 +255,7 @@ bool channel_close(uint64_t id) | ||||
| { | ||||
|   Channel *channel; | ||||
|  | ||||
|   if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) { | ||||
|   if (!(channel = find_rpc_channel(id))) { | ||||
|     return false; | ||||
|   } | ||||
|  | ||||
| @@ -322,24 +267,22 @@ bool channel_close(uint64_t id) | ||||
| /// Neovim | ||||
| void channel_from_stdio(void) | ||||
| { | ||||
|   Channel *channel = register_channel(kChannelTypeStdio, 0, NULL); | ||||
|   incref(channel);  // stdio channels are only closed on exit | ||||
|   Channel *channel = channel_alloc(kChannelStreamStdio); | ||||
|   channel_incref(channel);  // stdio channels are only closed on exit | ||||
|   // read stream | ||||
|   rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE); | ||||
|   rstream_start(&channel->data.std.in, receive_msgpack, channel); | ||||
|   // write stream | ||||
|   wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0); | ||||
|   rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, CHANNEL_BUFFER_SIZE); | ||||
|   wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); | ||||
|  | ||||
|   DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, | ||||
|        &channel->data.std.in, &channel->data.std.out); | ||||
|   rpc_start(channel); | ||||
| } | ||||
|  | ||||
| /// Creates a loopback channel. This is used to avoid deadlock | ||||
| /// when an instance connects to its own named pipe. | ||||
| uint64_t channel_create_internal(void) | ||||
| { | ||||
|   Channel *channel = register_channel(kChannelTypeInternal, 0, NULL); | ||||
|   incref(channel);  // internal channel lives until process exit | ||||
|   Channel *channel = channel_alloc(kChannelStreamInternal); | ||||
|   channel_incref(channel);  // internal channel lives until process exit | ||||
|   rpc_start(channel); | ||||
|   return channel->id; | ||||
| } | ||||
|  | ||||
| @@ -347,8 +290,8 @@ void channel_process_exit(uint64_t id, int status) | ||||
| { | ||||
|   Channel *channel = pmap_get(uint64_t)(channels, id); | ||||
|  | ||||
|   channel->closed = true; | ||||
|   decref(channel); | ||||
|   // channel_decref(channel); remove?? | ||||
|   channel->rpc.closed = true; | ||||
| } | ||||
|  | ||||
| // rstream.c:read_event() invokes this as stream->read_cb(). | ||||
| @@ -356,7 +299,7 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, | ||||
|                             void *data, bool eof) | ||||
| { | ||||
|   Channel *channel = data; | ||||
|   incref(channel); | ||||
|   channel_incref(channel); | ||||
|  | ||||
|   if (eof) { | ||||
|     close_channel(channel); | ||||
| @@ -367,30 +310,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, | ||||
|     goto end; | ||||
|   } | ||||
|  | ||||
|   if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed) | ||||
|       || (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) { | ||||
|     char buf[256]; | ||||
|     snprintf(buf, sizeof(buf), | ||||
|              "ch %" PRIu64 ": stream closed unexpectedly. " | ||||
|              "closing channel", | ||||
|              channel->id); | ||||
|     call_set_error(channel, buf, WARN_LOG_LEVEL); | ||||
|     goto end; | ||||
|   } | ||||
|  | ||||
|   size_t count = rbuffer_size(rbuf); | ||||
|   DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p", | ||||
|   DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", | ||||
|        channel->id, count, stream); | ||||
|  | ||||
|   // Feed the unpacker with data | ||||
|   msgpack_unpacker_reserve_buffer(channel->unpacker, count); | ||||
|   rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count); | ||||
|   msgpack_unpacker_buffer_consumed(channel->unpacker, count); | ||||
|   msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); | ||||
|   rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); | ||||
|   msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); | ||||
|  | ||||
|   parse_msgpack(channel); | ||||
|  | ||||
| end: | ||||
|   decref(channel); | ||||
|   channel_decref(channel); | ||||
| } | ||||
|  | ||||
| static void parse_msgpack(Channel *channel) | ||||
| @@ -400,8 +332,8 @@ static void parse_msgpack(Channel *channel) | ||||
|   msgpack_unpack_return result; | ||||
|  | ||||
|   // Deserialize everything we can. | ||||
|   while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == | ||||
|       MSGPACK_UNPACK_SUCCESS) { | ||||
|   while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == | ||||
|          MSGPACK_UNPACK_SUCCESS) { | ||||
|     bool is_response = is_rpc_response(&unpacked.data); | ||||
|     log_client_msg(channel->id, !is_response, unpacked.data); | ||||
|  | ||||
| @@ -427,7 +359,7 @@ static void parse_msgpack(Channel *channel) | ||||
|   if (result == MSGPACK_UNPACK_NOMEM_ERROR) { | ||||
|     mch_errmsg(e_outofmem); | ||||
|     mch_errmsg("\n"); | ||||
|     decref(channel); | ||||
|     channel_decref(channel); | ||||
|     preserve_exit(); | ||||
|   } | ||||
|  | ||||
| @@ -492,7 +424,7 @@ static void handle_request(Channel *channel, msgpack_object *request) | ||||
|   evdata->handler = handler; | ||||
|   evdata->args = args; | ||||
|   evdata->request_id = request_id; | ||||
|   incref(channel); | ||||
|   channel_incref(channel); | ||||
|   if (handler.async) { | ||||
|     bool is_get_mode = handler.fn == handle_nvim_get_mode; | ||||
|  | ||||
| @@ -530,66 +462,30 @@ static void on_request_event(void **argv) | ||||
|     api_free_object(result); | ||||
|   } | ||||
|   api_free_array(args); | ||||
|   decref(channel); | ||||
|   channel_decref(channel); | ||||
|   xfree(e); | ||||
|   api_clear_error(&error); | ||||
| } | ||||
|  | ||||
| /// Returns the Stream that a Channel writes to. | ||||
| static Stream *chan_wstream(Channel *chan) | ||||
| { | ||||
|   switch (chan->type) { | ||||
|     case kChannelTypeSocket: | ||||
|       return &chan->data.stream; | ||||
|     case kChannelTypeProc: | ||||
|       return chan->data.proc->in; | ||||
|     case kChannelTypeStdio: | ||||
|       return &chan->data.std.out; | ||||
|     case kChannelTypeInternal: | ||||
|       return NULL; | ||||
|   } | ||||
|   abort(); | ||||
| } | ||||
|  | ||||
| /// Returns the Stream that a Channel reads from. | ||||
| static Stream *chan_rstream(Channel *chan) | ||||
| { | ||||
|   switch (chan->type) { | ||||
|     case kChannelTypeSocket: | ||||
|       return &chan->data.stream; | ||||
|     case kChannelTypeProc: | ||||
|       return chan->data.proc->out; | ||||
|     case kChannelTypeStdio: | ||||
|       return &chan->data.std.in; | ||||
|     case kChannelTypeInternal: | ||||
|       return NULL; | ||||
|   } | ||||
|   abort(); | ||||
| } | ||||
|  | ||||
|  | ||||
| static bool channel_write(Channel *channel, WBuffer *buffer) | ||||
| { | ||||
|   bool success = false; | ||||
|   bool success; | ||||
|  | ||||
|   if (channel->closed) { | ||||
|   if (channel->rpc.closed) { | ||||
|     wstream_release_wbuffer(buffer); | ||||
|     return false; | ||||
|   } | ||||
|  | ||||
|   switch (channel->type) { | ||||
|     case kChannelTypeSocket: | ||||
|     case kChannelTypeProc: | ||||
|     case kChannelTypeStdio: | ||||
|       success = wstream_write(chan_wstream(channel), buffer); | ||||
|       break; | ||||
|     case kChannelTypeInternal: | ||||
|       incref(channel); | ||||
|       CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); | ||||
|       success = true; | ||||
|       break; | ||||
|   if (channel->streamtype == kChannelStreamInternal) { | ||||
|     channel_incref(channel); | ||||
|     CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); | ||||
|     success = true; | ||||
|   } else { | ||||
|     Stream *in = channel_instream(channel); | ||||
|     success = wstream_write(in, buffer); | ||||
|   } | ||||
|  | ||||
|  | ||||
|   if (!success) { | ||||
|     // If the write failed for any reason, close the channel | ||||
|     char buf[256]; | ||||
| @@ -609,14 +505,14 @@ static void internal_read_event(void **argv) | ||||
|   Channel *channel = argv[0]; | ||||
|   WBuffer *buffer = argv[1]; | ||||
|  | ||||
|   msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size); | ||||
|   memcpy(msgpack_unpacker_buffer(channel->unpacker), | ||||
|   msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); | ||||
|   memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), | ||||
|          buffer->data, buffer->size); | ||||
|   msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size); | ||||
|   msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); | ||||
|  | ||||
|   parse_msgpack(channel); | ||||
|  | ||||
|   decref(channel); | ||||
|   channel_decref(channel); | ||||
|   wstream_release_wbuffer(buffer); | ||||
| } | ||||
|  | ||||
| @@ -665,7 +561,8 @@ static void broadcast_event(const char *name, Array args) | ||||
|   Channel *channel; | ||||
|  | ||||
|   map_foreach_value(channels, channel, { | ||||
|     if (pmap_has(cstr_t)(channel->subscribed_events, name)) { | ||||
|     if (channel->is_rpc | ||||
|         && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { | ||||
|       kv_push(subscribed, channel); | ||||
|     } | ||||
|   }); | ||||
| @@ -695,10 +592,11 @@ end: | ||||
| static void unsubscribe(Channel *channel, char *event) | ||||
| { | ||||
|   char *event_string = pmap_get(cstr_t)(event_strings, event); | ||||
|   pmap_del(cstr_t)(channel->subscribed_events, event_string); | ||||
|   pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); | ||||
|  | ||||
|   map_foreach_value(channels, channel, { | ||||
|     if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) { | ||||
|     if (channel->is_rpc | ||||
|         && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { | ||||
|       return; | ||||
|     } | ||||
|   }); | ||||
| @@ -709,86 +607,65 @@ static void unsubscribe(Channel *channel, char *event) | ||||
| } | ||||
|  | ||||
| /// Close the channel streams/process and free the channel resources. | ||||
| /// TODO: move to channel.h | ||||
| static void close_channel(Channel *channel) | ||||
| { | ||||
|   if (channel->closed) { | ||||
|   if (channel->rpc.closed) { | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   channel->closed = true; | ||||
|   channel->rpc.closed = true; | ||||
|  | ||||
|   switch (channel->type) { | ||||
|     case kChannelTypeSocket: | ||||
|       stream_close(&channel->data.stream, NULL, NULL); | ||||
|   switch (channel->streamtype) { | ||||
|     case kChannelStreamSocket: | ||||
|       stream_close(&channel->stream.socket, NULL, NULL); | ||||
|       break; | ||||
|     case kChannelTypeProc: | ||||
|     case kChannelStreamProc: | ||||
|       // Only close the rpc channel part, | ||||
|       // there could be an error message on the stderr stream | ||||
|       process_close_in(channel->data.proc); | ||||
|       process_close_out(channel->data.proc); | ||||
|       process_close_in(&channel->stream.proc); | ||||
|       process_close_out(&channel->stream.proc); | ||||
|       break; | ||||
|     case kChannelTypeStdio: | ||||
|       stream_close(&channel->data.std.in, NULL, NULL); | ||||
|       stream_close(&channel->data.std.out, NULL, NULL); | ||||
|     case kChannelStreamStdio: | ||||
|       stream_close(&channel->stream.stdio.in, NULL, NULL); | ||||
|       stream_close(&channel->stream.stdio.out, NULL, NULL); | ||||
|       multiqueue_put(main_loop.fast_events, exit_event, 1, channel); | ||||
|       return; | ||||
|     case kChannelTypeInternal: | ||||
|     case kChannelStreamInternal: | ||||
|       // nothing to free. | ||||
|       break; | ||||
|   } | ||||
|  | ||||
|   decref(channel); | ||||
|   channel_decref(channel); | ||||
| } | ||||
|  | ||||
| static void exit_event(void **argv) | ||||
| { | ||||
|   decref(argv[0]); | ||||
|   channel_decref(argv[0]); | ||||
|  | ||||
|   if (!exiting) { | ||||
|     mch_exit(0); | ||||
|   } | ||||
| } | ||||
|  | ||||
| static void free_channel(Channel *channel) | ||||
| void rpc_free(Channel *channel) | ||||
| { | ||||
|   remote_ui_disconnect(channel->id); | ||||
|   pmap_del(uint64_t)(channels, channel->id); | ||||
|   msgpack_unpacker_free(channel->unpacker); | ||||
|   msgpack_unpacker_free(channel->rpc.unpacker); | ||||
|  | ||||
|   // Unsubscribe from all events | ||||
|   char *event_string; | ||||
|   map_foreach_value(channel->subscribed_events, event_string, { | ||||
|   map_foreach_value(channel->rpc.subscribed_events, event_string, { | ||||
|     unsubscribe(channel, event_string); | ||||
|   }); | ||||
|  | ||||
|   pmap_free(cstr_t)(channel->subscribed_events); | ||||
|   kv_destroy(channel->call_stack); | ||||
|   if (channel->type != kChannelTypeProc) { | ||||
|     multiqueue_free(channel->events); | ||||
|   } | ||||
|   xfree(channel); | ||||
|   pmap_free(cstr_t)(channel->rpc.subscribed_events); | ||||
|   kv_destroy(channel->rpc.call_stack); | ||||
| } | ||||
|  | ||||
| static void close_cb(Stream *stream, void *data) | ||||
| { | ||||
|   decref(data); | ||||
| } | ||||
|  | ||||
| static Channel *register_channel(ChannelType type, uint64_t id, | ||||
|                                  MultiQueue *events) | ||||
| { | ||||
|   Channel *rv = xmalloc(sizeof(Channel)); | ||||
|   rv->events = events ? events : multiqueue_new_child(main_loop.events); | ||||
|   rv->type = type; | ||||
|   rv->refcount = 1; | ||||
|   rv->closed = false; | ||||
|   rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); | ||||
|   rv->id = id > 0 ? id : next_chan_id++; | ||||
|   rv->subscribed_events = pmap_new(cstr_t)(); | ||||
|   rv->next_request_id = 1; | ||||
|   kv_init(rv->call_stack); | ||||
|   pmap_put(uint64_t)(channels, rv->id, rv); | ||||
|   return rv; | ||||
|   channel_decref(data); | ||||
| } | ||||
|  | ||||
| static bool is_rpc_response(msgpack_object *obj) | ||||
| @@ -803,15 +680,18 @@ static bool is_rpc_response(msgpack_object *obj) | ||||
| static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) | ||||
| { | ||||
|   uint64_t response_id = obj->via.array.ptr[1].via.u64; | ||||
|   if (kv_size(channel->rpc.call_stack) == 0) { | ||||
|     return false; | ||||
|   } | ||||
|  | ||||
|   // Must be equal to the frame at the stack's bottom | ||||
|   return kv_size(channel->call_stack) && response_id | ||||
|     == kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id; | ||||
|   ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); | ||||
|   return response_id == frame->request_id; | ||||
| } | ||||
|  | ||||
| static void complete_call(msgpack_object *obj, Channel *channel) | ||||
| { | ||||
|   ChannelCallFrame *frame = kv_A(channel->call_stack, | ||||
|                              kv_size(channel->call_stack) - 1); | ||||
|   ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); | ||||
|   frame->returned = true; | ||||
|   frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; | ||||
|  | ||||
| @@ -825,8 +705,8 @@ static void complete_call(msgpack_object *obj, Channel *channel) | ||||
| static void call_set_error(Channel *channel, char *msg, int loglevel) | ||||
| { | ||||
|   LOG(loglevel, "RPC: %s", msg); | ||||
|   for (size_t i = 0; i < kv_size(channel->call_stack); i++) { | ||||
|     ChannelCallFrame *frame = kv_A(channel->call_stack, i); | ||||
|   for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { | ||||
|     ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); | ||||
|     frame->returned = true; | ||||
|     frame->errored = true; | ||||
|     api_free_object(frame->result); | ||||
| @@ -875,18 +755,6 @@ static WBuffer *serialize_response(uint64_t channel_id, | ||||
|   return rv; | ||||
| } | ||||
|  | ||||
| static void incref(Channel *channel) | ||||
| { | ||||
|   channel->refcount++; | ||||
| } | ||||
|  | ||||
| static void decref(Channel *channel) | ||||
| { | ||||
|   if (!(--channel->refcount)) { | ||||
|     free_channel(channel); | ||||
|   } | ||||
| } | ||||
|  | ||||
| #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL | ||||
| #define REQ "[request]  " | ||||
| #define RES "[response] " | ||||
|   | ||||
| @@ -8,6 +8,7 @@ | ||||
| #include "nvim/event/socket.h" | ||||
| #include "nvim/event/process.h" | ||||
| #include "nvim/vim.h" | ||||
| #include "nvim/channel.h" | ||||
|  | ||||
| #define METHOD_MAXLEN 512 | ||||
|  | ||||
| @@ -16,6 +17,7 @@ | ||||
| ///       of os_inchar(), so they are processed "just-in-time". | ||||
| MultiQueue *ch_before_blocking_events; | ||||
|  | ||||
|  | ||||
| #ifdef INCLUDE_GENERATED_DECLARATIONS | ||||
| # include "msgpack_rpc/channel.h.generated.h" | ||||
| #endif | ||||
|   | ||||
							
								
								
									
										36
									
								
								src/nvim/msgpack_rpc/channel_defs.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								src/nvim/msgpack_rpc/channel_defs.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | ||||
| #ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H | ||||
| #define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H | ||||
|  | ||||
| #include <stdbool.h> | ||||
| #include <uv.h> | ||||
| #include <msgpack.h> | ||||
|  | ||||
| #include "nvim/api/private/defs.h" | ||||
| #include "nvim/event/socket.h" | ||||
| #include "nvim/event/process.h" | ||||
| #include "nvim/vim.h" | ||||
|  | ||||
| typedef struct Channel Channel; | ||||
|  | ||||
| typedef struct { | ||||
|   uint64_t request_id; | ||||
|   bool returned, errored; | ||||
|   Object result; | ||||
| } ChannelCallFrame; | ||||
|  | ||||
| typedef struct { | ||||
|   Channel *channel; | ||||
|   MsgpackRpcRequestHandler handler; | ||||
|   Array args; | ||||
|   uint64_t request_id; | ||||
| } RequestEvent; | ||||
|  | ||||
| typedef struct { | ||||
|   PMap(cstr_t) *subscribed_events; | ||||
|   bool closed; | ||||
|   msgpack_unpacker *unpacker; | ||||
|   uint64_t next_request_id; | ||||
|   kvec_t(ChannelCallFrame *) call_stack; | ||||
| } RpcState; | ||||
|  | ||||
| #endif  // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H | ||||
| @@ -47,7 +47,7 @@ int pty_process_spawn(PtyProcess *ptyproc) | ||||
|  | ||||
|   int status = 0;  // zero or negative error code (libuv convention) | ||||
|   Process *proc = (Process *)ptyproc; | ||||
|   assert(!proc->err); | ||||
|   assert(proc->err.closed); | ||||
|   uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD); | ||||
|   ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 }; | ||||
|   uv_disable_stdio_inheritance(); | ||||
| @@ -83,12 +83,12 @@ int pty_process_spawn(PtyProcess *ptyproc) | ||||
|     goto error; | ||||
|   } | ||||
|  | ||||
|   if (proc->in | ||||
|       && (status = set_duplicating_descriptor(master, &proc->in->uv.pipe))) { | ||||
|   if (!proc->in.closed | ||||
|       && (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) { | ||||
|     goto error; | ||||
|   } | ||||
|   if (proc->out | ||||
|       && (status = set_duplicating_descriptor(master, &proc->out->uv.pipe))) { | ||||
|   if (!proc->out.closed | ||||
|       && (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) { | ||||
|     goto error; | ||||
|   } | ||||
|  | ||||
|   | ||||
| @@ -44,7 +44,7 @@ int pty_process_spawn(PtyProcess *ptyproc) | ||||
|   wchar_t *cwd = NULL; | ||||
|   const char *emsg = NULL; | ||||
|  | ||||
|   assert(!proc->err); | ||||
|   assert(proc->err.closed); | ||||
|  | ||||
|   cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err); | ||||
|   if (cfg == NULL) { | ||||
| @@ -71,20 +71,20 @@ int pty_process_spawn(PtyProcess *ptyproc) | ||||
|     goto cleanup; | ||||
|   } | ||||
|  | ||||
|   if (proc->in != NULL) { | ||||
|   if (!proc->in.closed) { | ||||
|     in_req = xmalloc(sizeof(uv_connect_t)); | ||||
|     uv_pipe_connect( | ||||
|         in_req, | ||||
|         &proc->in->uv.pipe, | ||||
|         &proc->in.uv.pipe, | ||||
|         in_name, | ||||
|         pty_process_connect_cb); | ||||
|   } | ||||
|  | ||||
|   if (proc->out != NULL) { | ||||
|   if (!proc->out.closed) { | ||||
|     out_req = xmalloc(sizeof(uv_connect_t)); | ||||
|     uv_pipe_connect( | ||||
|         out_req, | ||||
|         &proc->out->uv.pipe, | ||||
|         &proc->out.uv.pipe, | ||||
|         out_name, | ||||
|         pty_process_connect_cb); | ||||
|   } | ||||
| @@ -228,7 +228,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer) | ||||
|   PtyProcess *ptyproc = wait_eof_timer->data; | ||||
|   Process *proc = (Process *)ptyproc; | ||||
|  | ||||
|   if (!proc->out || !uv_is_readable(proc->out->uvstream)) { | ||||
|   if (proc->out.closed || !uv_is_readable(proc->out.uvstream)) { | ||||
|     uv_timer_stop(&ptyproc->wait_eof_timer); | ||||
|     pty_process_finish2(ptyproc); | ||||
|   } | ||||
|   | ||||
| @@ -207,16 +207,12 @@ static int do_os_system(char **argv, | ||||
|   char prog[MAXPATHL]; | ||||
|   xstrlcpy(prog, argv[0], MAXPATHL); | ||||
|  | ||||
|   Stream in, out, err; | ||||
|   LibuvProcess uvproc = libuv_process_init(&main_loop, &buf); | ||||
|   Process *proc = &uvproc.process; | ||||
|   MultiQueue *events = multiqueue_new_child(main_loop.events); | ||||
|   proc->events = events; | ||||
|   proc->argv = argv; | ||||
|   proc->in = input != NULL ? &in : NULL; | ||||
|   proc->out = &out; | ||||
|   proc->err = &err; | ||||
|   int status = process_spawn(proc); | ||||
|   int status = process_spawn(proc, input != NULL, true, true); | ||||
|   if (status) { | ||||
|     loop_poll_events(&main_loop, 0); | ||||
|     // Failed, probably 'shell' is not executable. | ||||
| @@ -236,27 +232,27 @@ static int do_os_system(char **argv, | ||||
|   // streams while there's still data in the OS buffer (due to the process | ||||
|   // exiting before all data is read). | ||||
|   if (input != NULL) { | ||||
|     proc->in->events = NULL; | ||||
|     wstream_init(proc->in, 0); | ||||
|     proc->in.events = NULL; | ||||
|     wstream_init(&proc->in, 0); | ||||
|   } | ||||
|   proc->out->events = NULL; | ||||
|   rstream_init(proc->out, 0); | ||||
|   rstream_start(proc->out, data_cb, &buf); | ||||
|   proc->err->events = NULL; | ||||
|   rstream_init(proc->err, 0); | ||||
|   rstream_start(proc->err, data_cb, &buf); | ||||
|   proc->out.events = NULL; | ||||
|   rstream_init(&proc->out, 0); | ||||
|   rstream_start(&proc->out, data_cb, &buf); | ||||
|   proc->err.events = NULL; | ||||
|   rstream_init(&proc->err, 0); | ||||
|   rstream_start(&proc->err, data_cb, &buf); | ||||
|  | ||||
|   // write the input, if any | ||||
|   if (input) { | ||||
|     WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL); | ||||
|  | ||||
|     if (!wstream_write(&in, input_buffer)) { | ||||
|     if (!wstream_write(&proc->in, input_buffer)) { | ||||
|       // couldn't write, stop the process and tell the user about it | ||||
|       process_stop(proc); | ||||
|       return -1; | ||||
|     } | ||||
|     // close the input stream after everything is written | ||||
|     wstream_set_write_cb(&in, shell_write_cb, NULL); | ||||
|     wstream_set_write_cb(&proc->in, shell_write_cb, NULL); | ||||
|   } | ||||
|  | ||||
|   // Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Björn Linse
					Björn Linse