From 2ce71629c3d953b43183198611698a2d6f359ebd Mon Sep 17 00:00:00 2001 From: wxiaoguang Date: Sat, 7 Mar 2026 00:28:46 +0800 Subject: [PATCH] Fix dbfs error handling (#36844) Add tests for opening non-existing files. --- models/dbfs/dbfile.go | 29 ++++------ models/dbfs/dbfs.go | 3 + models/dbfs/dbfs_test.go | 84 +++++++++++++++++++++------- modules/actions/log.go | 22 ++++---- routers/api/actions/runner/runner.go | 2 +- 5 files changed, 91 insertions(+), 49 deletions(-) diff --git a/models/dbfs/dbfile.go b/models/dbfs/dbfile.go index ccb13583e1..a6981cb7d6 100644 --- a/models/dbfs/dbfile.go +++ b/models/dbfs/dbfile.go @@ -75,7 +75,7 @@ func (f *file) readAt(fileMeta *dbfsMeta, offset int64, p []byte) (n int, err er } func (f *file) Read(p []byte) (n int, err error) { - if f.metaID == 0 || !f.allowRead { + if !f.allowRead { return 0, os.ErrInvalid } @@ -89,7 +89,7 @@ func (f *file) Read(p []byte) (n int, err error) { } func (f *file) Write(p []byte) (n int, err error) { - if f.metaID == 0 || !f.allowWrite { + if !f.allowWrite { return 0, os.ErrInvalid } @@ -184,10 +184,6 @@ func (f *file) Close() error { } func (f *file) Stat() (os.FileInfo, error) { - if f.metaID == 0 { - return nil, os.ErrInvalid - } - fileMeta, err := findFileMetaByID(f.ctx, f.metaID) if err != nil { return nil, err @@ -232,15 +228,17 @@ func (f *file) open(flag int) (err error) { if f.metaID != 0 { return os.ErrExist } - } else { - // create a new file if none exists. - if f.metaID == 0 { - if err = f.createEmpty(); err != nil { - return err - } + } + // create a new file if not exists. + if f.metaID == 0 { + if err = f.createEmpty(); err != nil { + return err } } } + if f.metaID == 0 { + return os.ErrNotExist + } if flag&os.O_TRUNC != 0 { if err = f.truncate(); err != nil { return err @@ -252,7 +250,7 @@ func (f *file) open(flag int) (err error) { } } return nil - } + } // end if: allowWrite // read only mode if f.metaID == 0 { @@ -322,9 +320,6 @@ func (f *file) delete() error { } func (f *file) size() (int64, error) { - if f.metaID == 0 { - return 0, os.ErrNotExist - } fileMeta, err := findFileMetaByID(f.ctx, f.metaID) if err != nil { return 0, err @@ -339,7 +334,7 @@ func findFileMetaByID(ctx context.Context, metaID int64) (*dbfsMeta, error) { } else if ok { return &fileMeta, nil } - return nil, nil //nolint:nilnil // return nil to indicate that the object does not exist + return nil, os.ErrNotExist } func buildPath(path string) string { diff --git a/models/dbfs/dbfs.go b/models/dbfs/dbfs.go index f68b4a2b70..3f768b5339 100644 --- a/models/dbfs/dbfs.go +++ b/models/dbfs/dbfs.go @@ -40,6 +40,9 @@ The DBFS solution: * In the future, when Gitea action needs to limit the log size (other CI/CD services also do so), it's easier to calculate the log file size. * Even sometimes the UI needs to render the tailing lines, the tailing lines can be found be counting the "\n" from the end of the file by seek. The seeking and finding is not the fastest way, but it's still acceptable and won't affect the performance too much. + +Limitations of the DBFS solution: +* Not fully POSIX-compliant, some behaviors may be different from the real filesystem, especially for concurrent read/write */ type dbfsMeta struct { diff --git a/models/dbfs/dbfs_test.go b/models/dbfs/dbfs_test.go index e1ecd871e4..ca57ebe171 100644 --- a/models/dbfs/dbfs_test.go +++ b/models/dbfs/dbfs_test.go @@ -9,19 +9,14 @@ import ( "os" "testing" + "code.gitea.io/gitea/modules/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func changeDefaultFileBlockSize(n int64) (restore func()) { - old := defaultFileBlockSize - defaultFileBlockSize = n - return func() { - defaultFileBlockSize = old - } -} - func TestDbfsBasic(t *testing.T) { - defer changeDefaultFileBlockSize(4)() + defer test.MockVariableValue(&defaultFileBlockSize, 4)() // test basic write/read f, err := OpenFile(t.Context(), "test.txt", os.O_RDWR|os.O_CREATE) @@ -122,10 +117,55 @@ func TestDbfsBasic(t *testing.T) { stat, err = f.Stat() assert.NoError(t, err) assert.EqualValues(t, 10, stat.Size()) + + t.Run("NonExisting", func(t *testing.T) { + f, err := OpenFile(t.Context(), "non-existing.txt", os.O_RDONLY) + assert.ErrorIs(t, err, os.ErrNotExist) + assert.Nil(t, f) + + f, err = OpenFile(t.Context(), "non-existing.txt", os.O_WRONLY) + assert.ErrorIs(t, err, os.ErrNotExist) + assert.Nil(t, f) + + f, err = OpenFile(t.Context(), "non-existing.txt", os.O_WRONLY|os.O_APPEND|os.O_TRUNC) + assert.ErrorIs(t, err, os.ErrNotExist) + assert.Nil(t, f) + }) + + t.Run("Existing", func(t *testing.T) { + assertFileContent := func(f File, expected string) { + _, err := f.Seek(0, io.SeekStart) + require.NoError(t, err) + buf, err := io.ReadAll(f) + require.NoError(t, err) + assert.Equal(t, expected, string(buf)) + } + + f, err := OpenFile(t.Context(), "existing.txt", os.O_RDWR|os.O_CREATE) + require.NoError(t, err) + _, _ = f.Write([]byte("test")) + assertFileContent(f, "test") + assert.NoError(t, f.Close()) + + f, err = OpenFile(t.Context(), "existing.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND) + require.NoError(t, err) + _, _ = f.Write([]byte("\nnew")) + assertFileContent(f, "test\nnew") + assert.NoError(t, f.Close()) + + f, err = OpenFile(t.Context(), "existing.txt", os.O_RDWR|os.O_TRUNC) + require.NoError(t, err) + assertFileContent(f, "") + assert.NoError(t, f.Close()) + + f, err = OpenFile(t.Context(), "existing.txt", os.O_RDWR|os.O_CREATE|os.O_EXCL) + assert.ErrorIs(t, err, os.ErrExist) + assert.Nil(t, f) + }) } func TestDbfsReadWrite(t *testing.T) { - defer changeDefaultFileBlockSize(4)() + defer test.MockVariableValue(&defaultFileBlockSize, 4)() f1, err := OpenFile(t.Context(), "test.log", os.O_RDWR|os.O_CREATE) assert.NoError(t, err) @@ -157,30 +197,32 @@ func TestDbfsReadWrite(t *testing.T) { } func TestDbfsSeekWrite(t *testing.T) { - defer changeDefaultFileBlockSize(4)() + defer test.MockVariableValue(&defaultFileBlockSize, 4)() - f, err := OpenFile(t.Context(), "test2.log", os.O_RDWR|os.O_CREATE) - assert.NoError(t, err) - defer f.Close() + // write something + fw, err := OpenFile(t.Context(), "test2.log", os.O_RDWR|os.O_CREATE) + require.NoError(t, err) + defer fw.Close() - n, err := f.Write([]byte("111")) + n, err := fw.Write([]byte("111")) assert.NoError(t, err) - _, err = f.Seek(int64(n), io.SeekStart) + _, err = fw.Seek(int64(n), io.SeekStart) assert.NoError(t, err) - _, err = f.Write([]byte("222")) + _, err = fw.Write([]byte("222")) assert.NoError(t, err) - _, err = f.Seek(int64(n), io.SeekStart) + _, err = fw.Seek(int64(n), io.SeekStart) assert.NoError(t, err) - _, err = f.Write([]byte("333")) + _, err = fw.Write([]byte("333")) assert.NoError(t, err) + // then read it fr, err := OpenFile(t.Context(), "test2.log", os.O_RDONLY) - assert.NoError(t, err) - defer f.Close() + require.NoError(t, err) + defer fr.Close() buf, err := io.ReadAll(fr) assert.NoError(t, err) diff --git a/modules/actions/log.go b/modules/actions/log.go index 5a1425e031..3fb56b402a 100644 --- a/modules/actions/log.go +++ b/modules/actions/log.go @@ -33,21 +33,22 @@ const ( // It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content. // Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage. func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { - flag := os.O_WRONLY + flag, openFileFor := os.O_WRONLY, "write-only" if offset == 0 { - // Create file only if offset is 0, or it could result in content holes if the file doesn't exist. - flag |= os.O_CREATE + // Only allow to create file if offset is 0 (the first write), see #25560. + // Otherwise, it might result in content holes if the file has been deleted after transferred (actions.TransferLogs). + flag, openFileFor = os.O_WRONLY|os.O_CREATE, "write-create" } name := DBFSPrefix + filename f, err := dbfs.OpenFile(ctx, name, flag) if err != nil { - return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) + return nil, fmt.Errorf("dbfs.OpenFile %q for %s: %w", name, openFileFor, err) } defer f.Close() stat, err := f.Stat() if err != nil { - return nil, fmt.Errorf("dbfs Stat %q: %w", name, err) + return nil, fmt.Errorf("dbfs.Stat %q: %w", name, err) } if stat.Size() < offset { // If the size is less than offset, refuse to write, or it could result in content holes. @@ -56,7 +57,7 @@ func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runne } if _, err := f.Seek(offset, io.SeekStart); err != nil { - return nil, fmt.Errorf("dbfs Seek %q: %w", name, err) + return nil, fmt.Errorf("dbfs.Seek %q: %w", name, err) } writer := bufio.NewWriterSize(f, defaultBufSize) @@ -121,16 +122,17 @@ const ( // TransferLogs transfers logs from DBFS to object storage. // It happens when the file is complete and no more logs will be appended. // It respects the file format in the filename like ".zst", and compresses the content if needed. +// The task log file must be marked as "log_in_storage=true" after the transfer. func TransferLogs(ctx context.Context, filename string) (func(), error) { name := DBFSPrefix + filename remove := func() { if err := dbfs.Remove(ctx, name); err != nil { - log.Warn("dbfs remove %q: %v", name, err) + log.Warn("dbfs.Remove %q: %v", name, err) } } f, err := dbfs.Open(ctx, name) if err != nil { - return nil, fmt.Errorf("dbfs open %q: %w", name, err) + return nil, fmt.Errorf("dbfs.Open %q: %w", name, err) } defer f.Close() @@ -164,7 +166,7 @@ func RemoveLogs(ctx context.Context, inStorage bool, filename string) error { name := DBFSPrefix + filename err := dbfs.Remove(ctx, name) if err != nil { - return fmt.Errorf("dbfs remove %q: %w", name, err) + return fmt.Errorf("dbfs.Remove %q: %w", name, err) } return nil } @@ -180,7 +182,7 @@ func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeek name := DBFSPrefix + filename f, err := dbfs.Open(ctx, name) if err != nil { - return nil, fmt.Errorf("dbfs open %q: %w", name, err) + return nil, fmt.Errorf("dbfs.Open %q: %w", name, err) } return f, nil } diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 86bab4b340..49d1b13262 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -270,7 +270,7 @@ func (s *Service) UpdateLog( rows := req.Msg.Rows[ack-req.Msg.Index:] ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows) if err != nil { - return nil, status.Errorf(codes.Internal, "write logs: %v", err) + return nil, status.Errorf(codes.Internal, "unable to append logs to dbfs file: %v", err) } task.LogLength += int64(len(rows)) for _, n := range ns {