From 73b330467bb6bc74a51444ae975d87da66d8c2ff Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Wed, 8 Oct 2025 19:03:18 +0000 Subject: [PATCH] Use tmp storage while uploading, use tx where needed Use temporary storage while the client is streaming the file to GARM. This ensures that while uploading, we don't lock the blob database. On slow connections this would mean that no readers would be able to access the db while data was being written to it via the upload process. By saving the file to a temporary location and only after we receive the entire thing, add it to the DB, we significantly reduce the time we need to keep the DB locked. Signed-off-by: Gabriel Adrian Samfira --- apiserver/controllers/file_object.go | 13 ++-- config/config.go | 34 ++++++++-- database/sql/file_store.go | 93 ++++++++++++++++++++-------- database/sql/models.go | 16 +++++ database/sql/sql.go | 44 +++++-------- util/util.go | 14 +++++ util/util_nix.go | 21 +++++++ util/util_windows.go | 22 +++++++ 8 files changed, 193 insertions(+), 64 deletions(-) create mode 100644 util/util_nix.go create mode 100644 util/util_windows.go diff --git a/apiserver/controllers/file_object.go b/apiserver/controllers/file_object.go index 562966ec..47a949c5 100644 --- a/apiserver/controllers/file_object.go +++ b/apiserver/controllers/file_object.go @@ -275,12 +275,6 @@ func (a *APIController) DownloadFileObject(w http.ResponseWriter, r *http.Reques return } - objectHandle, err := a.r.GetFileObjectReader(ctx, objectID) - if err != nil { - handleError(ctx, w, err) - return - } - defer objectHandle.Close() w.Header().Set("Content-Type", objectDetails.FileType) w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", objectDetails.Name)) w.Header().Set("Content-Length", strconv.FormatInt(objectDetails.Size, 10)) @@ -289,6 +283,13 @@ func (a *APIController) DownloadFileObject(w http.ResponseWriter, r *http.Reques return } + objectHandle, err := a.r.GetFileObjectReader(ctx, objectID) + if err != nil { + slog.ErrorContext(ctx, "failed to get file object reader", "error", err) + handleError(ctx, w, err) + return + } + defer objectHandle.Close() copied, err := io.Copy(w, objectHandle) if err != nil { slog.ErrorContext(ctx, "failed to stream data", "error", err) diff --git a/config/config.go b/config/config.go index 4bca63fd..4b104b99 100644 --- a/config/config.go +++ b/config/config.go @@ -515,6 +515,27 @@ func (d *Database) GormParams() (dbType DBBackendType, uri string, err error) { return } +func (d *Database) SQLiteBlobDatabaseConfig() (Database, error) { + if d.DbBackend != SQLiteBackend { + return Database{}, fmt.Errorf("sqlite3 is not the configured database") + } + + blobDBFile, err := d.SQLite.BlobDBFile() + if err != nil { + return Database{}, fmt.Errorf("failed to get blob DB file: %w", err) + } + db := Database{ + Debug: d.Debug, + DbBackend: SQLiteBackend, + Passphrase: d.Passphrase, + SQLite: SQLite{ + DBFile: blobDBFile, + BusyTimeoutSeconds: d.SQLite.BusyTimeoutSeconds, + }, + } + return db, nil +} + // Validate validates the database config entry func (d *Database) Validate() error { if d.DbBackend == "" { @@ -575,16 +596,21 @@ func (s *SQLite) BlobDBFile() (string, error) { parent := filepath.Dir(s.DBFile) dbFileName := filepath.Base(s.DBFile) blobFile := fmt.Sprintf("blob-%s", dbFileName) - return filepath.Join(parent, blobFile), nil + blobPath := filepath.Join(parent, blobFile) + return blobPath, nil } -func (s *SQLite) ConnectionString() (string, error) { - connectionString := fmt.Sprintf("%s?_journal_mode=WAL&_foreign_keys=ON&_txlock=immediate", s.DBFile) +func (s *SQLite) connectionStringForDBFile(dbFile string) string { + connectionString := fmt.Sprintf("%s?_journal_mode=WAL&_foreign_keys=ON&_txlock=immediate", dbFile) if s.BusyTimeoutSeconds > 0 { timeout := s.BusyTimeoutSeconds * 1000 connectionString = fmt.Sprintf("%s&_busy_timeout=%d", connectionString, timeout) } - return connectionString, nil + return connectionString +} + +func (s *SQLite) ConnectionString() (string, error) { + return s.connectionStringForDBFile(s.DBFile), nil } // MySQL is the config entry for the mysql section diff --git a/database/sql/file_store.go b/database/sql/file_store.go index cade73d0..45e761ba 100644 --- a/database/sql/file_store.go +++ b/database/sql/file_store.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "math" + "os" "github.com/mattn/go-sqlite3" "gorm.io/gorm" @@ -21,9 +22,31 @@ import ( // func (s *sqlDatabase) CreateFileObject(ctx context.Context, name string, size int64, tags []string, reader io.Reader) (fileObjParam params.FileObject, err error) { func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateFileObjectParams, reader io.Reader) (fileObjParam params.FileObject, err error) { + // Save the file to temporary storage first. This allows us to accept the entire file, even over + // a slow connection, without locking the database as we stream the file to the DB. + // SQLite will lock the entire database (including for readers) when the data is being commited. + tmpFile, err := util.GetTmpFileHandle("") + if err != nil { + return params.FileObject{}, fmt.Errorf("failed to create tmp file: %w", err) + } + defer func() { + tmpFile.Close() + os.Remove(tmpFile.Name()) + }() + if _, err := io.Copy(tmpFile, reader); err != nil { + return params.FileObject{}, fmt.Errorf("failed to copy data: %w", err) + } + if err := tmpFile.Sync(); err != nil { + return params.FileObject{}, fmt.Errorf("failed to flush data to disk: %w", err) + } + // File has been transfered. We need to seek to the beginning of the file. This same handler will be used + // to streab the data to the database. + if _, err := tmpFile.Seek(0, 0); err != nil { + return params.FileObject{}, fmt.Errorf("failed to seek to beginning: %w", err) + } // Read first 8KB for type detection buffer := make([]byte, 8192) - n, _ := io.ReadFull(reader, buffer) + n, _ := io.ReadFull(tmpFile, buffer) fileType := util.DetectFileType(buffer[:n]) fileObj := FileObject{ @@ -39,19 +62,44 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF } }() - // Create the file first, without any space allocated for the blob. - if err := s.objectsConn.Create(&fileObj).Error; err != nil { - return params.FileObject{}, fmt.Errorf("failed to create file object: %w", err) - } + var fileBlob FileBlob + err = s.objectsConn.Transaction(func(tx *gorm.DB) error { + // Create the file first + if err := tx.Create(&fileObj).Error; err != nil { + return fmt.Errorf("failed to create file object: %w", err) + } - // allocate space for the blob using the zeroblob() function. This will allow us to avoid - // having to allocate potentially huge byte arrays in memory and writing that huge blob to - // disk. - query := `UPDATE file_objects SET content = zeroblob(?) WHERE id = ?` - if err := s.objectsConn.Exec(query, param.Size, fileObj.ID).Error; err != nil { - return params.FileObject{}, fmt.Errorf("failed to allocate disk space: %w", err) - } + // create the file blob, without any space allocated for the blob. + fileBlob = FileBlob{ + FileObjectID: fileObj.ID, + } + if err := tx.Create(&fileBlob).Error; err != nil { + return fmt.Errorf("failed to create file blob object: %w", err) + } + // allocate space for the blob using the zeroblob() function. This will allow us to avoid + // having to allocate potentially huge byte arrays in memory and writing that huge blob to + // disk. + query := `UPDATE file_blobs SET content = zeroblob(?) WHERE id = ?` + if err := tx.Exec(query, param.Size, fileBlob.ID).Error; err != nil { + return fmt.Errorf("failed to allocate disk space: %w", err) + } + // Create tag entries + for _, tag := range param.Tags { + fileObjTag := FileObjectTag{ + FileObjectID: fileObj.ID, + Tag: tag, + } + if err := tx.Create(&fileObjTag).Error; err != nil { + return fmt.Errorf("failed to add tag: %w", err) + } + } + return nil + }) + + if err != nil { + return params.FileObject{}, fmt.Errorf("failed to create database entry for blob: %w", err) + } // Stream file to blob and compute SHA256 conn, err := s.objectsSqlDB.Conn(ctx) if err != nil { @@ -63,7 +111,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF err = conn.Raw(func(driverConn any) error { sqliteConn := driverConn.(*sqlite3.SQLiteConn) - blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(fileObj.ID), 1) + blob, err := sqliteConn.Blob("main", "file_blobs", "content", int64(fileBlob.ID), 1) if err != nil { return fmt.Errorf("failed to open blob: %w", err) } @@ -79,7 +127,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF hasher.Write(buffer[:n]) // Stream the rest with hash computation - _, err = io.Copy(io.MultiWriter(blob, hasher), reader) + _, err = io.Copy(io.MultiWriter(blob, hasher), tmpFile) if err != nil { return fmt.Errorf("failed to write blob: %w", err) } @@ -97,17 +145,6 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF return params.FileObject{}, fmt.Errorf("failed to update sha256sum: %w", err) } - // Create tag entries - for _, tag := range param.Tags { - fileObjTag := FileObjectTag{ - FileObjectID: fileObj.ID, - Tag: tag, - } - if err := s.objectsConn.Create(&fileObjTag).Error; err != nil { - return params.FileObject{}, fmt.Errorf("failed to add tag: %w", err) - } - } - // Reload document with tags if err := s.objectsConn.Preload("TagsList").Omit("content").First(&fileObj, fileObj.ID).Error; err != nil { return params.FileObject{}, fmt.Errorf("failed to get file object: %w", err) @@ -307,11 +344,15 @@ func (s *sqlDatabase) OpenFileObjectContent(ctx context.Context, objID uint) (io return nil, fmt.Errorf("failed to get connection: %w", err) } + var fileBlob FileBlob + if err := s.objectsConn.Where("file_object_id = ?", objID).Omit("content").First(&fileBlob).Error; err != nil { + return nil, fmt.Errorf("failed to get file blob: %w", err) + } var blobReader io.ReadCloser err = conn.Raw(func(driverConn any) error { sqliteConn := driverConn.(*sqlite3.SQLiteConn) - blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(objID), 0) + blob, err := sqliteConn.Blob("main", "file_blobs", "content", int64(fileBlob.ID), 0) if err != nil { return fmt.Errorf("failed to open blob: %w", err) } diff --git a/database/sql/models.go b/database/sql/models.go index 6e112c3d..c1f8b71c 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -473,6 +473,22 @@ type FileObject struct { SHA256 string `gorm:"type:text"` // Tags is a JSON array of tags TagsList []FileObjectTag `gorm:"foreignKey:FileObjectID;constraint:OnDelete:CASCADE"` + // Content is a foreign key to a different table where the blob is actually stored. + // Updating a field in an sqlite 3 DB will read the entire field, update the column + // and write it back to a different location. SQLite3 will then mark the old row as "deleted" + // and allow it to be vaccumed. But if we have a blob column with a huge blob, any + // update operation will consume a lot of resources and take a long time. + // Using a dedicated table for the blob (which doesn't change), speeds up updates of + // metadata fields like name, description, tags, etc. + Content FileBlob `gorm:"foreignKey:FileObjectID;constraint:OnDelete:CASCADE"` +} + +// FileBlob is the immutable blob of bytes that once written will not be changed. +// We leave the SHA256, file type and size in the parent table, because we need to +// we able to get that info easily, without preloading the blob table. +type FileBlob struct { + gorm.Model + FileObjectID uint `gorm:"index:idx_fileobject_blob_id"` // Content is a BLOB column for storing binary data Content []byte `gorm:"type:blob"` } diff --git a/database/sql/sql.go b/database/sql/sql.go index 7a305871..310f8de6 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -91,45 +91,33 @@ func NewSQLDatabase(ctx context.Context, cfg config.Database) (common.Store, err return nil, fmt.Errorf("failed to get underlying database connection: %w", err) } + db := &sqlDatabase{ + conn: conn, + sqlDB: sqlDB, + ctx: ctx, + cfg: cfg, + producer: producer, + } + // Create separate connection for objects database (only for SQLite) - var objectsConn *gorm.DB - var objectsSqlDB *sql.DB if cfg.DbBackend == config.SQLiteBackend { - // Get the blob database file path - blobFile, err := cfg.SQLite.BlobDBFile() + // Get config for objects database + objectsCfg, err := cfg.SQLiteBlobDatabaseConfig() if err != nil { - return nil, fmt.Errorf("failed to get blob database file: %w", err) + return nil, fmt.Errorf("failed to get blob DB config: %w", err) } - // Create config for objects database - objectsCfg := config.Database{ - DbBackend: config.SQLiteBackend, - Debug: cfg.Debug, - Passphrase: cfg.Passphrase, - SQLite: config.SQLite{ - DBFile: blobFile, - }, - } - - objectsConn, err = newDBConn(objectsCfg) + objectsConn, err := newDBConn(objectsCfg) if err != nil { return nil, fmt.Errorf("error creating objects DB connection: %w", err) } - objectsSqlDB, err = objectsConn.DB() + objectsSqlDB, err := objectsConn.DB() if err != nil { return nil, fmt.Errorf("failed to get underlying objects database connection: %w", err) } - } - - db := &sqlDatabase{ - conn: conn, - sqlDB: sqlDB, - objectsConn: objectsConn, - objectsSqlDB: objectsSqlDB, - ctx: ctx, - cfg: cfg, - producer: producer, + db.objectsConn = objectsConn + db.objectsSqlDB = objectsSqlDB } if err := db.migrateDB(); err != nil { @@ -454,7 +442,7 @@ func (s *sqlDatabase) migrateFileObjects() error { } // Use GORM AutoMigrate on the separate connection - if err := s.objectsConn.AutoMigrate(&FileObject{}, &FileObjectTag{}); err != nil { + if err := s.objectsConn.AutoMigrate(&FileObject{}, &FileBlob{}, &FileObjectTag{}); err != nil { return fmt.Errorf("failed to migrate file objects: %w", err) } diff --git a/util/util.go b/util/util.go index 6234aa84..b30a17b7 100644 --- a/util/util.go +++ b/util/util.go @@ -20,6 +20,7 @@ import ( "fmt" "log/slog" "net/http" + "os" "unicode/utf8" "github.com/h2non/filetype" @@ -179,3 +180,16 @@ func DetectFileType(data []byte) string { // Default to application/octet-stream for unknown types return "application/octet-stream" } + +func GetTmpFileHandle(baseDir string) (*os.File, error) { + tmpDir, err := getTempDir(baseDir) + if err != nil { + return nil, fmt.Errorf("failed to get tmpDir: %w", err) + } + tmpFile, err := os.CreateTemp(tmpDir, "garm-upload") + if err != nil { + return nil, fmt.Errorf("failed to create temp file: %w", err) + } + tmpFile.Name() + return tmpFile, nil +} diff --git a/util/util_nix.go b/util/util_nix.go new file mode 100644 index 00000000..f7d06f09 --- /dev/null +++ b/util/util_nix.go @@ -0,0 +1,21 @@ +//go:build !windows +// +build !windows + +package util + +import ( + "os" +) + +func getTempDir(baseDir string) (string, error) { + dir := baseDir + if baseDir == "" { + envTmp := os.Getenv("TMPDIR") + if envTmp == "" { + envTmp = "/tmp" + } + dir = envTmp + } + + return dir, nil +} diff --git a/util/util_windows.go b/util/util_windows.go new file mode 100644 index 00000000..c7baf4c3 --- /dev/null +++ b/util/util_windows.go @@ -0,0 +1,22 @@ +package util + +import ( + "fmt" + "os" +) + +func getTempDir(baseDir string) (string, error) { + dir := baseDir + if baseDir == "" { + envTmp := os.Getenv("TEMP") + if envTmp == "" { + envTmp = os.Getenv("TMP") + } + dir = envTmp + } + + if dir == "" { + return "", fmt.Errorf("failed to determine destination dir") + } + return dir, nil +}