fix(lsp): handle multiple clients with incremental sync (#19658)

The change tracking used a single lines/lines_tmp table to track
changes to a buffer.

If multiple clients using incremental sync are connected to a buffer,
they both made changes to the same lines table. That resulted in an
inconsistent state.

This commit changes the didChange handling to group clients by
synchronization scheme and offset encoding.
This avoids computing the diff multiple times for clients using the
same scheme and resolves the lines/lines_tmp conflicts.

Fixes https://github.com/neovim/neovim/issues/19325
This commit is contained in:
Mathias Fußenegger
2022-08-09 22:20:40 +02:00
committed by GitHub
parent 3030b4d653
commit bebfbfab3b

View File

@@ -338,54 +338,165 @@ end
local changetracking = {} local changetracking = {}
do do
--@private ---@private
--- client_id → state
--- ---
--- state --- LSP has 3 different sync modes:
--- use_incremental_sync: bool --- - None (Servers will read the files themselves when needed)
--- buffers: bufnr -> buffer_state --- - Full (Client sends the full buffer content on updates)
--- - Incremental (Client sends only the changed parts)
--- ---
--- buffer_state --- Changes are tracked per buffer.
--- pending_change?: function that the timer starts to trigger didChange --- A buffer can have multiple clients attached and each client needs to send the changes
--- pending_changes: table (uri -> list of pending changeset tables)); --- To minimize the amount of changesets to compute, computation is grouped:
--- Only set if incremental_sync is used
--- ---
--- timer?: uv_timer --- None: One group for all clients
--- lines: table --- Full: One group for all clients
local state_by_client = {} --- Incremental: One group per `offset_encoding`
---
--- Sending changes can be debounced per buffer. To simplify the implementation the
--- smallest debounce interval is used and we don't group clients by different intervals.
---
--- @class CTGroup
--- @field sync_kind number TextDocumentSyncKind, considers config.flags.allow_incremental_sync
--- @field offset_encoding "utf-8"|"utf-16"|"utf-32"
---
--- @class CTBufferState
--- @field name string name of the buffer
--- @field lines string[] snapshot of buffer lines from last didChange
--- @field lines_tmp string[]
--- @field pending_changes table[] List of debounced changes in incremental sync mode
--- @field timer nil|userdata uv_timer
--- @field last_flush nil|number uv.hrtime of the last flush/didChange-notification
--- @field needs_flush boolean true if buffer updates haven't been sent to clients/servers yet
--- @field refs number how many clients are using this group
---
--- @class CTGroupState
--- @field buffers table<number, CTBufferState>
--- @field debounce number debounce duration in ms
--- @field clients table<number, table> clients using this state. {client_id, client}
---@private
---@param group CTGroup
---@return string
local function group_key(group)
if group.sync_kind == protocol.TextDocumentSyncKind.Incremental then
return tostring(group.sync_kind) .. '\0' .. group.offset_encoding
end
return tostring(group.sync_kind)
end
---@private
---@type table<CTGroup, CTGroupState>
local state_by_group = setmetatable({}, {
__index = function(tbl, k)
return rawget(tbl, group_key(k))
end,
__newindex = function(tbl, k, v)
rawset(tbl, group_key(k), v)
end,
})
---@private
---@return CTGroup
local function get_group(client)
local allow_inc_sync = if_nil(client.config.flags.allow_incremental_sync, true)
local change_capability = vim.tbl_get(client.server_capabilities, 'textDocumentSync', 'change')
local sync_kind = change_capability or protocol.TextDocumentSyncKind.None
if not allow_inc_sync and change_capability == protocol.TextDocumentSyncKind.Incremental then
sync_kind = protocol.TextDocumentSyncKind.Full
end
return {
sync_kind = sync_kind,
offset_encoding = client.offset_encoding,
}
end
---@private
---@param state CTBufferState
local function incremental_changes(state, encoding, bufnr, firstline, lastline, new_lastline)
local prev_lines = state.lines
local curr_lines = state.lines_tmp
local changed_lines = nvim_buf_get_lines(bufnr, firstline, new_lastline, true)
for i = 1, firstline do
curr_lines[i] = prev_lines[i]
end
for i = firstline + 1, new_lastline do
curr_lines[i] = changed_lines[i - firstline]
end
for i = lastline + 1, #prev_lines do
curr_lines[i - lastline + new_lastline] = prev_lines[i]
end
if tbl_isempty(curr_lines) then
-- Can happen when deleting the entire contents of a buffer, see https://github.com/neovim/neovim/issues/16259.
curr_lines[1] = ''
end
local line_ending = buf_get_line_ending(bufnr)
local incremental_change = sync.compute_diff(
state.lines,
curr_lines,
firstline,
lastline,
new_lastline,
encoding,
line_ending
)
-- Double-buffering of lines tables is used to reduce the load on the garbage collector.
-- At this point the prev_lines table is useless, but its internal storage has already been allocated,
-- so let's keep it around for the next didChange event, in which it will become the next
-- curr_lines table. Note that setting elements to nil doesn't actually deallocate slots in the
-- internal storage - it merely marks them as free, for the GC to deallocate them.
for i in ipairs(prev_lines) do
prev_lines[i] = nil
end
state.lines = curr_lines
state.lines_tmp = prev_lines
return incremental_change
end
---@private ---@private
function changetracking.init(client, bufnr) function changetracking.init(client, bufnr)
local use_incremental_sync = ( assert(client.offset_encoding, 'lsp client must have an offset_encoding')
if_nil(client.config.flags.allow_incremental_sync, true) local group = get_group(client)
and vim.tbl_get(client.server_capabilities, 'textDocumentSync', 'change') local state = state_by_group[group]
== protocol.TextDocumentSyncKind.Incremental if state then
) state.debounce = math.min(state.debounce, client.config.flags.debounce_text_changes or 150)
local state = state_by_client[client.id] state.clients[client.id] = client
if not state then else
state = { state = {
buffers = {}, buffers = {},
debounce = client.config.flags.debounce_text_changes or 150, debounce = client.config.flags.debounce_text_changes or 150,
use_incremental_sync = use_incremental_sync, clients = {
[client.id] = client,
},
} }
state_by_client[client.id] = state state_by_group[group] = state
end end
if not state.buffers[bufnr] then local buf_state = state.buffers[bufnr]
local buf_state = { if buf_state then
buf_state.refs = buf_state.refs + 1
else
buf_state = {
name = api.nvim_buf_get_name(bufnr), name = api.nvim_buf_get_name(bufnr),
lines = {},
lines_tmp = {},
pending_changes = {},
needs_flush = false,
refs = 1,
} }
state.buffers[bufnr] = buf_state state.buffers[bufnr] = buf_state
if use_incremental_sync then if group.sync_kind == protocol.TextDocumentSyncKind.Incremental then
buf_state.lines = nvim_buf_get_lines(bufnr, 0, -1, true) buf_state.lines = nvim_buf_get_lines(bufnr, 0, -1, true)
buf_state.lines_tmp = {}
buf_state.pending_changes = {}
end end
end end
end end
---@private ---@private
function changetracking._get_and_set_name(client, bufnr, name) function changetracking._get_and_set_name(client, bufnr, name)
local state = state_by_client[client.id] or {} local state = state_by_group[get_group(client)] or {}
local buf_state = (state.buffers or {})[bufnr] local buf_state = (state.buffers or {})[bufnr]
local old_name = buf_state.name local old_name = buf_state.name
buf_state.name = name buf_state.name = name
@@ -395,32 +506,33 @@ do
---@private ---@private
function changetracking.reset_buf(client, bufnr) function changetracking.reset_buf(client, bufnr)
changetracking.flush(client, bufnr) changetracking.flush(client, bufnr)
local state = state_by_client[client.id] local state = state_by_group[get_group(client)]
if state and state.buffers then if not state then
local buf_state = state.buffers[bufnr] return
end
assert(state.buffers, 'CTGroupState must have buffers')
local buf_state = state.buffers[bufnr]
buf_state.refs = buf_state.refs - 1
assert(buf_state.refs >= 0, 'refcount on buffer state must not get negative')
if buf_state.refs == 0 then
state.buffers[bufnr] = nil state.buffers[bufnr] = nil
if buf_state and buf_state.timer then changetracking._reset_timer(buf_state)
buf_state.timer:stop()
buf_state.timer:close()
buf_state.timer = nil
end
end end
end end
---@private ---@private
function changetracking.reset(client_id) function changetracking.reset(client)
local state = state_by_client[client_id] local state = state_by_group[get_group(client)]
if not state then if not state then
return return
end end
for _, buf_state in pairs(state.buffers) do state.clients[client.id] = nil
if buf_state.timer then if vim.tbl_count(state.clients) == 0 then
buf_state.timer:stop() for _, buf_state in pairs(state.buffers) do
buf_state.timer:close() changetracking._reset_timer(buf_state)
buf_state.timer = nil
end end
state.buffers = {}
end end
state.buffers = {}
end end
---@private ---@private
@@ -430,6 +542,10 @@ do
-- debounce can be skipped and otherwise maybe reduced. -- debounce can be skipped and otherwise maybe reduced.
-- --
-- This turns the debounce into a kind of client rate limiting -- This turns the debounce into a kind of client rate limiting
--
---@param debounce number
---@param buf_state CTBufferState
---@return number
local function next_debounce(debounce, buf_state) local function next_debounce(debounce, buf_state)
if debounce == 0 then if debounce == 0 then
return 0 return 0
@@ -444,83 +560,36 @@ do
end end
---@private ---@private
function changetracking.prepare(bufnr, firstline, lastline, new_lastline) ---@param bufnr number
local incremental_changes = function(client, buf_state) ---@param sync_kind number protocol.TextDocumentSyncKind
local prev_lines = buf_state.lines ---@param state CTGroupState
local curr_lines = buf_state.lines_tmp ---@param buf_state CTBufferState
local function send_changes(bufnr, sync_kind, state, buf_state)
local changed_lines = nvim_buf_get_lines(bufnr, firstline, new_lastline, true) if not buf_state.needs_flush then
for i = 1, firstline do return
curr_lines[i] = prev_lines[i]
end
for i = firstline + 1, new_lastline do
curr_lines[i] = changed_lines[i - firstline]
end
for i = lastline + 1, #prev_lines do
curr_lines[i - lastline + new_lastline] = prev_lines[i]
end
if tbl_isempty(curr_lines) then
-- Can happen when deleting the entire contents of a buffer, see https://github.com/neovim/neovim/issues/16259.
curr_lines[1] = ''
end
local line_ending = buf_get_line_ending(bufnr)
local incremental_change = sync.compute_diff(
buf_state.lines,
curr_lines,
firstline,
lastline,
new_lastline,
client.offset_encoding or 'utf-16',
line_ending
)
-- Double-buffering of lines tables is used to reduce the load on the garbage collector.
-- At this point the prev_lines table is useless, but its internal storage has already been allocated,
-- so let's keep it around for the next didChange event, in which it will become the next
-- curr_lines table. Note that setting elements to nil doesn't actually deallocate slots in the
-- internal storage - it merely marks them as free, for the GC to deallocate them.
for i in ipairs(prev_lines) do
prev_lines[i] = nil
end
buf_state.lines = curr_lines
buf_state.lines_tmp = prev_lines
return incremental_change
end end
local full_changes = once(function() buf_state.last_flush = uv.hrtime()
return { buf_state.needs_flush = false
text = buf_get_full_text(bufnr),
if not api.nvim_buf_is_valid(bufnr) then
buf_state.pending_changes = {}
return
end
local changes
if sync_kind == protocol.TextDocumentSyncKind.None then
return
elseif sync_kind == protocol.TextDocumentSyncKind.Incremental then
changes = buf_state.pending_changes
buf_state.pending_changes = {}
else
changes = {
{ text = buf_get_full_text(bufnr) },
} }
end) end
local uri = vim.uri_from_bufnr(bufnr) local uri = vim.uri_from_bufnr(bufnr)
return function(client) for _, client in pairs(state.clients) do
if if not client.is_stopped() and lsp.buf_is_attached(bufnr, client.id) then
vim.tbl_get(client.server_capabilities, 'textDocumentSync', 'change')
== protocol.TextDocumentSyncKind.None
then
return
end
local state = state_by_client[client.id]
local buf_state = state.buffers[bufnr]
changetracking._reset_timer(buf_state)
local debounce = next_debounce(state.debounce, buf_state)
if state.use_incremental_sync then
-- This must be done immediately and cannot be delayed
-- The contents would further change and startline/endline may no longer fit
table.insert(buf_state.pending_changes, incremental_changes(client, buf_state))
end
buf_state.pending_change = function()
if buf_state.pending_change == nil then
return
end
buf_state.pending_change = nil
buf_state.last_flush = uv.hrtime()
if client.is_stopped() or not api.nvim_buf_is_valid(bufnr) then
return
end
local changes = state.use_incremental_sync and buf_state.pending_changes
or { full_changes() }
client.notify('textDocument/didChange', { client.notify('textDocument/didChange', {
textDocument = { textDocument = {
uri = uri, uri = uri,
@@ -528,46 +597,90 @@ do
}, },
contentChanges = changes, contentChanges = changes,
}) })
buf_state.pending_changes = {}
end
if debounce == 0 then
buf_state.pending_change()
else
local timer = uv.new_timer()
buf_state.timer = timer
-- Must use schedule_wrap because `full_changes()` calls nvim_buf_get_lines
timer:start(debounce, 0, vim.schedule_wrap(buf_state.pending_change))
end end
end end
end end
---@private
function changetracking.send_changes(bufnr, firstline, lastline, new_lastline)
local groups = {}
for _, client in pairs(lsp.get_active_clients({ bufnr = bufnr })) do
local group = get_group(client)
groups[group_key(group)] = group
end
for _, group in pairs(groups) do
local state = state_by_group[group]
if not state then
error(
string.format(
'changetracking.init must have been called for all LSP clients. group=%s states=%s',
vim.inspect(group),
vim.inspect(vim.tbl_keys(state_by_group))
)
)
end
local buf_state = state.buffers[bufnr]
buf_state.needs_flush = true
changetracking._reset_timer(buf_state)
local debounce = next_debounce(state.debounce, buf_state)
if group.sync_kind == protocol.TextDocumentSyncKind.Incremental then
-- This must be done immediately and cannot be delayed
-- The contents would further change and startline/endline may no longer fit
local changes = incremental_changes(
buf_state,
group.offset_encoding,
bufnr,
firstline,
lastline,
new_lastline
)
table.insert(buf_state.pending_changes, changes)
end
if debounce == 0 then
send_changes(bufnr, group.sync_kind, state, buf_state)
else
local timer = uv.new_timer()
buf_state.timer = timer
timer:start(
debounce,
0,
vim.schedule_wrap(function()
changetracking._reset_timer(buf_state)
send_changes(bufnr, group.sync_kind, state, buf_state)
end)
)
end
end
end
---@private
function changetracking._reset_timer(buf_state) function changetracking._reset_timer(buf_state)
if buf_state.timer then local timer = buf_state.timer
buf_state.timer:stop() if timer then
buf_state.timer:close()
buf_state.timer = nil buf_state.timer = nil
if not timer:is_closing() then
timer:stop()
timer:close()
end
end end
end end
--- Flushes any outstanding change notification. --- Flushes any outstanding change notification.
---@private ---@private
function changetracking.flush(client, bufnr) function changetracking.flush(client, bufnr)
local state = state_by_client[client.id] local group = get_group(client)
local state = state_by_group[group]
if not state then if not state then
return return
end end
if bufnr then if bufnr then
local buf_state = state.buffers[bufnr] or {} local buf_state = state.buffers[bufnr] or {}
changetracking._reset_timer(buf_state) changetracking._reset_timer(buf_state)
if buf_state.pending_change then send_changes(bufnr, group.sync_kind, state, buf_state)
buf_state.pending_change()
end
else else
for _, buf_state in pairs(state.buffers) do for buf, buf_state in pairs(state.buffers) do
changetracking._reset_timer(buf_state) changetracking._reset_timer(buf_state)
if buf_state.pending_change then send_changes(buf, group.sync_kind, state, buf_state)
buf_state.pending_change()
end
end end
end end
end end
@@ -1030,11 +1143,12 @@ function lsp.start_client(config)
end) end)
end end
end end
local client = active_clients[client_id] and active_clients[client_id]
or uninitialized_clients[client_id]
active_clients[client_id] = nil active_clients[client_id] = nil
uninitialized_clients[client_id] = nil uninitialized_clients[client_id] = nil
changetracking.reset(client_id) changetracking.reset(client)
if code ~= 0 or (signal ~= 0 and signal ~= 15) then if code ~= 0 or (signal ~= 0 and signal ~= 15) then
local msg = local msg =
string.format('Client %s quit with exit code %s and signal %s', client_id, code, signal) string.format('Client %s quit with exit code %s and signal %s', client_id, code, signal)
@@ -1414,9 +1528,7 @@ do
return true return true
end end
util.buf_versions[bufnr] = changedtick util.buf_versions[bufnr] = changedtick
local compute_change_and_notify = changetracking.send_changes(bufnr, firstline, lastline, new_lastline)
changetracking.prepare(bufnr, firstline, lastline, new_lastline)
for_each_buffer_client(bufnr, compute_change_and_notify)
end end
end end