diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 3c61f31c00e..e98aef95153 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -310,6 +310,15 @@ func (s *Service) UpdateLog( rows = req.Msg.Rows[ack-req.Msg.Index:] } + // Ack a re-sent finalize idempotently. Appending new rows past the seal errors. + if task.LogInStorage { + if len(rows) > 0 { + return nil, status.Errorf(codes.AlreadyExists, "log file has been archived") + } + res.Msg.AckIndex = ack + return res, nil + } + // Bail unless we have new rows or a NoMore to finalize. Even with // NoMore, bail when the runner has outrun the server — archiving a // log with a gap is worse than asking it to retry. @@ -318,10 +327,6 @@ func (s *Service) UpdateLog( return res, nil } - if task.LogInStorage { - return nil, status.Errorf(codes.AlreadyExists, "log file has been archived") - } - // WriteLogs is called even with no rows: with offset==0 it bootstraps // an empty DBFS file so TransferLogs below has something to read when // the runner finalizes a task that produced no log output. diff --git a/tests/integration/actions_log_finalize_test.go b/tests/integration/actions_log_finalize_test.go index 95488b4bbb6..134f96bda7b 100644 --- a/tests/integration/actions_log_finalize_test.go +++ b/tests/integration/actions_log_finalize_test.go @@ -73,5 +73,19 @@ jobs: _, err = dbfs.Open(t.Context(), actions_module.DBFSPrefix+freshTask.LogFilename) assert.ErrorIs(t, err, os.ErrNotExist, "DBFS row must be cleaned up after TransferLogs") + + // The runner re-sends its final UpdateLog when the response was lost. + // A sealed log must ack the re-send and still reject new appended rows. + t.Run("re-sent finalize is idempotent", func(t *testing.T) { + finalize := &runnerv1.UpdateLogRequest{TaskId: task.Id, Index: 0, Rows: nil, NoMore: true} + resp, err := runner.client.runnerServiceClient.UpdateLog(t.Context(), connect.NewRequest(finalize)) + require.NoError(t, err) + assert.EqualValues(t, 0, resp.Msg.AckIndex) + + _, err = runner.client.runnerServiceClient.UpdateLog(t.Context(), connect.NewRequest(&runnerv1.UpdateLogRequest{ + TaskId: task.Id, Index: 0, Rows: []*runnerv1.LogRow{{Content: "late"}}, NoMore: true, + })) + require.Error(t, err, "appending rows past the seal must be rejected") + }) }) }