From 86cd4897012758a59d8068b796b449ff7ff37f16 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 8 Dec 2025 07:09:11 -0800 Subject: [PATCH] terminal/tmux: introduce command queue for viewer --- src/terminal/tmux/viewer.zig | 196 ++++++++++++++++++++++++++-------- src/termio/stream_handler.zig | 3 +- 2 files changed, 156 insertions(+), 43 deletions(-) diff --git a/src/terminal/tmux/viewer.zig b/src/terminal/tmux/viewer.zig index 275f93d5e..384ad609b 100644 --- a/src/terminal/tmux/viewer.zig +++ b/src/terminal/tmux/viewer.zig @@ -3,6 +3,7 @@ const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; const testing = std.testing; const assert = @import("../../quirks.zig").inlineAssert; +const CircBuf = @import("../../datastruct/main.zig").CircBuf; const control = @import("control.zig"); const output = @import("output.zig"); @@ -19,6 +20,12 @@ const log = std.log.scoped(.terminal_tmux_viewer); // in case something breaks in the future we can consider it. We should // be able to easily unit test all variations seen in the real world. +/// The initial capacity of the command queue. We dynamically resize +/// as necessary so the initial value isn't that important, but if we +/// want to feel good about it we should make it large enough to support +/// our most realistic use cases without resizing. +const COMMAND_QUEUE_INITIAL = 8; + /// A viewer is a tmux control mode client that attempts to create /// a remote view of a tmux session, including providing the ability to send /// new input to the session. @@ -40,6 +47,11 @@ pub const Viewer = struct { /// The current session ID we're attached to. session_id: usize, + /// The list of commands we've sent that we want to send and wait + /// for a response for. We only send one command at a time just + /// to avoid any possible confusion around ordering. + command_queue: CommandQueue, + /// The windows in the current session. windows: std.ArrayList(Window), @@ -52,6 +64,8 @@ pub const Viewer = struct { /// errors on single-action returns, especially those such as `.exit`. action_single: [1]Action, + pub const CommandQueue = CircBuf(Command, undefined); + pub const Action = union(enum) { /// Tmux has closed the control mode connection, we should end /// our viewer session in some way. @@ -111,7 +125,11 @@ pub const Viewer = struct { /// /// The given allocator is used for all internal state. You must /// call deinit when you're done with the viewer to free it. - pub fn init(alloc: Allocator) Viewer { + pub fn init(alloc: Allocator) Allocator.Error!Viewer { + // Create our initial command queue + var command_queue: CommandQueue = try .init(alloc, COMMAND_QUEUE_INITIAL); + errdefer command_queue.deinit(alloc); + return .{ .alloc = alloc, .state = .startup_block, @@ -119,6 +137,7 @@ pub const Viewer = struct { // until we receive a session-changed notification which will // set this to a real value. .session_id = 0, + .command_queue = command_queue, .windows = .empty, .action_arena = .{}, .action_single = undefined, @@ -127,6 +146,11 @@ pub const Viewer = struct { pub fn deinit(self: *Viewer) void { self.windows.deinit(self.alloc); + { + var it = self.command_queue.iterator(.forward); + while (it.next()) |command| command.deinit(self.alloc); + self.command_queue.deinit(self.alloc); + } self.action_arena.promote(self.alloc).deinit(); } @@ -155,11 +179,7 @@ pub const Viewer = struct { .startup_block => self.nextStartupBlock(n), .startup_session => self.nextStartupSession(n), - .idle => self.nextIdle(n), - - // Once we're in the main states, there's a bunch of shared - // logic so we centralize it. - .list_windows => self.nextCommand(n), + .command_queue => self.nextCommand(n), }; } @@ -209,11 +229,11 @@ pub const Viewer = struct { .session_changed => |info| { self.session_id = info.id; - self.state = .list_windows; - return self.singleAction(.{ .command = std.fmt.comptimePrint( - "list-windows -F '{s}'\n", - .{comptime Format.list_windows.comptimeFormat()}, - ) }); + self.state = .command_queue; + return self.singleAction(self.queueCommand(.list_windows) catch { + log.warn("failed to queue command, becoming defunct", .{}); + return self.defunct(); + }); }, else => return &.{}, @@ -237,39 +257,85 @@ pub const Viewer = struct { self: *Viewer, n: control.Notification, ) []const Action { - switch (n) { - .enter => unreachable, + // We have to be in a command queue, but the command queue MAY + // be empty. If it is empty, then receivedCommandOutput will + // handle it by ignoring any command output. That's okay! + assert(self.state == .command_queue); - .exit => return self.defunct(), + return switch (n) { + .enter => unreachable, + .exit => self.defunct(), inline .block_end, .block_err, - => |content, tag| switch (self.state) { - .startup_block, - .startup_session, - .idle, - .defunct, - => unreachable, - - .list_windows => { - // Move to defunct on error blocks. - if (comptime tag == .block_err) return self.defunct(); - return self.receivedListWindows(content) catch return self.defunct(); - }, + => |content, tag| self.receivedCommandOutput( + content, + tag == .block_err, + ) catch err: { + log.warn("failed to process command output, becoming defunct", .{}); + break :err self.defunct(); }, // TODO: Use exhaustive matching here, determine if we need // to handle the other cases. - else => return &.{}, + else => &.{}, + }; + } + + fn receivedCommandOutput( + self: *Viewer, + content: []const u8, + is_err: bool, + ) ![]const Action { + // If we have no pending commands, this is unexpected. + const command = self.command_queue.first() orelse { + log.info("unexpected block output err={}", .{is_err}); + return &.{}; + }; + self.command_queue.deleteOldest(1); + + // We always free any memory associated with the command + defer command.deinit(self.alloc); + + // We'll use our arena for the return value here so we can + // easily accumulate actions. + var arena = self.action_arena.promote(self.alloc); + defer self.action_arena = arena.state; + _ = arena.reset(.free_all); + const arena_alloc = arena.allocator(); + + // Build up our actions to start with the next command if + // we have one. + var actions: std.ArrayList(Action) = .empty; + if (self.command_queue.first()) |next_command| { + try actions.append( + arena_alloc, + .{ .command = next_command.string() }, + ); } + + // Process our command + switch (command.*) { + .user => {}, + .list_windows => try self.receivedListWindows( + arena_alloc, + &actions, + content, + ), + } + + // Our command processing should not change our state + assert(self.state == .command_queue); + + return actions.items; } fn receivedListWindows( self: *Viewer, + arena_alloc: Allocator, + actions: *std.ArrayList(Action), content: []const u8, - ) ![]const Action { - assert(self.state == .list_windows); - + ) !void { // This stores our new window state from this list-windows output. var windows: std.ArrayList(Window) = .empty; errdefer windows.deinit(self.alloc); @@ -299,14 +365,27 @@ pub const Viewer = struct { self.windows.deinit(self.alloc); self.windows = windows; - // Go into the idle state - self.state = .idle; - // TODO: Diff with prior window state, dispatch capture-pane // requests to collect all of the screen contents, other terminal // state, etc. - return self.singleAction(.{ .windows = self.windows.items }); + try actions.append(arena_alloc, .{ .windows = self.windows.items }); + } + + /// This queues the command at the end of the command queue + /// and returns an action representing the next command that + /// should be run (the head). + /// + /// The next command is not removed, because the expectation is + /// that the head of our command list is always sent to tmux. + fn queueCommand(self: *Viewer, command: Command) Allocator.Error!Action { + // Add our command + try self.command_queue.ensureUnusedCapacity(self.alloc, 1); + self.command_queue.appendAssumeCapacity(command); + + // Get our first command to send, guaranteed to exist since we + // just appended one. + return .{ .command = self.command_queue.first().?.string() }; } /// Helper to return a single action. The input action may use the arena @@ -323,7 +402,7 @@ pub const Viewer = struct { } }; -const State = union(enum) { +const State = enum { /// We start in this state just after receiving the initial /// DCS 1000p opening sequence. We wait for an initial /// begin/end block that is guaranteed to be sent by tmux for @@ -338,13 +417,46 @@ const State = union(enum) { /// Tmux has closed the control mode connection defunct, - /// We're waiting on a list-windows response from tmux. This will - /// be used to resynchronize our entire window state. + /// We're sitting on the command queue waiting for command output + /// in the order provided in the `command_queue` field. This field + /// isn't part of the state because it can be queued at any state. + /// + /// Precondition: if self.command_queue.len > 0, then the first + /// command in the queue has already been sent to tmux (via a + /// `command` Action). The next output is assumed to be the result + /// of this command. + /// + /// To satisfy the above, any transitions INTO this state should + /// send a command Action for the first command in the queue. + command_queue, +}; + +const Command = union(enum) { + /// List all windows so we can sync our window state. list_windows, - /// Idle state, we're not actually doing anything right now except - /// waiting for more events from tmux that may change our behavior. - idle, + /// User command. This is a command provided by the user. Since + /// this is user provided, we can't be sure what it is. + user: []const u8, + + pub fn deinit(self: Command, alloc: Allocator) void { + return switch (self) { + .list_windows => {}, + .user => |v| alloc.free(v), + }; + } + + /// Returns the command to execute. The memory of the return + /// value is always safe as long as this command value is alive. + pub fn string(self: Command) []const u8 { + return switch (self) { + .list_windows => std.fmt.comptimePrint( + "list-windows -F '{s}'\n", + .{comptime Format.list_windows.comptimeFormat()}, + ), + .user => |v| v, + }; + } }; /// Format strings used for commands in our viewer. @@ -379,7 +491,7 @@ const Format = struct { }; test "immediate exit" { - var viewer = Viewer.init(testing.allocator); + var viewer = try Viewer.init(testing.allocator); defer viewer.deinit(); const actions = viewer.next(.{ .tmux = .exit }); try testing.expectEqual(1, actions.len); @@ -389,7 +501,7 @@ test "immediate exit" { } test "initial flow" { - var viewer = Viewer.init(testing.allocator); + var viewer = try Viewer.init(testing.allocator); defer viewer.deinit(); // First we receive the initial block end diff --git a/src/termio/stream_handler.zig b/src/termio/stream_handler.zig index ba207ce7b..eabfd6a4b 100644 --- a/src/termio/stream_handler.zig +++ b/src/termio/stream_handler.zig @@ -392,7 +392,8 @@ pub const StreamHandler = struct { assert(self.tmux_viewer == null); const viewer = try self.alloc.create(terminal.tmux.Viewer); errdefer self.alloc.destroy(viewer); - viewer.* = .init(self.alloc); + viewer.* = try .init(self.alloc); + errdefer viewer.deinit(); self.tmux_viewer = viewer; break :tmux; },