diff --git a/config/config.go b/config/config.go index c62f314e..4bca63fd 100644 --- a/config/config.go +++ b/config/config.go @@ -567,6 +567,17 @@ func (s *SQLite) Validate() error { return nil } +func (s *SQLite) BlobDBFile() (string, error) { + if err := s.Validate(); err != nil { + return "", fmt.Errorf("failed to validate sqlite3 config") + } + + parent := filepath.Dir(s.DBFile) + dbFileName := filepath.Base(s.DBFile) + blobFile := fmt.Sprintf("blob-%s", dbFileName) + return filepath.Join(parent, blobFile), nil +} + func (s *SQLite) ConnectionString() (string, error) { connectionString := fmt.Sprintf("%s?_journal_mode=WAL&_foreign_keys=ON&_txlock=immediate", s.DBFile) if s.BusyTimeoutSeconds > 0 { diff --git a/database/sql/file_store.go b/database/sql/file_store.go index fcdacff1..cade73d0 100644 --- a/database/sql/file_store.go +++ b/database/sql/file_store.go @@ -40,20 +40,20 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF }() // Create the file first, without any space allocated for the blob. - if err := s.conn.Create(&fileObj).Error; err != nil { + if err := s.objectsConn.Create(&fileObj).Error; err != nil { return params.FileObject{}, fmt.Errorf("failed to create file object: %w", err) } // allocate space for the blob using the zeroblob() function. This will allow us to avoid // having to allocate potentially huge byte arrays in memory and writing that huge blob to // disk. - query := fmt.Sprintf(`UPDATE %q SET content = zeroblob(?) WHERE id = ?`, fileObj.TableName()) - if err := s.conn.Exec(query, param.Size, fileObj.ID).Error; err != nil { + query := `UPDATE file_objects SET content = zeroblob(?) WHERE id = ?` + if err := s.objectsConn.Exec(query, param.Size, fileObj.ID).Error; err != nil { return params.FileObject{}, fmt.Errorf("failed to allocate disk space: %w", err) } // Stream file to blob and compute SHA256 - conn, err := s.sqlDB.Conn(ctx) + conn, err := s.objectsSqlDB.Conn(ctx) if err != nil { return params.FileObject{}, fmt.Errorf("failed to get connection from pool: %w", err) } @@ -63,7 +63,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF err = conn.Raw(func(driverConn any) error { sqliteConn := driverConn.(*sqlite3.SQLiteConn) - blob, err := sqliteConn.Blob("main", fileObj.TableName(), "content", int64(fileObj.ID), 1) + blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(fileObj.ID), 1) if err != nil { return fmt.Errorf("failed to open blob: %w", err) } @@ -93,7 +93,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF } // Update document with SHA256 - if err := s.conn.Model(&fileObj).Update("sha256", sha256sum).Error; err != nil { + if err := s.objectsConn.Model(&fileObj).Update("sha256", sha256sum).Error; err != nil { return params.FileObject{}, fmt.Errorf("failed to update sha256sum: %w", err) } @@ -103,13 +103,13 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF FileObjectID: fileObj.ID, Tag: tag, } - if err := s.conn.Create(&fileObjTag).Error; err != nil { + if err := s.objectsConn.Create(&fileObjTag).Error; err != nil { return params.FileObject{}, fmt.Errorf("failed to add tag: %w", err) } } // Reload document with tags - if err := s.conn.Preload("TagsList").Omit("content").First(&fileObj, fileObj.ID).Error; err != nil { + if err := s.objectsConn.Preload("TagsList").Omit("content").First(&fileObj, fileObj.ID).Error; err != nil { return params.FileObject{}, fmt.Errorf("failed to get file object: %w", err) } return s.sqlFileObjectToCommonParams(fileObj), nil @@ -127,7 +127,7 @@ func (s *sqlDatabase) UpdateFileObject(_ context.Context, objID uint, param para }() var fileObj FileObject - err = s.conn.Transaction(func(tx *gorm.DB) error { + err = s.objectsConn.Transaction(func(tx *gorm.DB) error { if err := tx.Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return runnerErrors.NewNotFoundError("could not find file object with ID: %d", objID) @@ -192,7 +192,7 @@ func (s *sqlDatabase) DeleteFileObject(_ context.Context, objID uint) (err error }() var fileObj FileObject - err = s.conn.Transaction(func(tx *gorm.DB) error { + err = s.objectsConn.Transaction(func(tx *gorm.DB) error { if err := tx.Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return runnerErrors.ErrNotFound @@ -219,7 +219,7 @@ func (s *sqlDatabase) DeleteFileObject(_ context.Context, objID uint) (err error func (s *sqlDatabase) GetFileObject(_ context.Context, objID uint) (params.FileObject, error) { var fileObj FileObject - if err := s.conn.Preload("TagsList").Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil { + if err := s.objectsConn.Preload("TagsList").Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return params.FileObject{}, runnerErrors.NewNotFoundError("could not find file object with ID: %d", objID) } @@ -237,7 +237,7 @@ func (s *sqlDatabase) SearchFileObjectByTags(_ context.Context, tags []string, p } var fileObjectRes []FileObject - query := s.conn.Model(&FileObject{}).Preload("TagsList").Omit("content") + query := s.objectsConn.Model(&FileObject{}).Preload("TagsList").Omit("content") for _, t := range tags { query = query.Where("EXISTS (SELECT 1 FROM file_object_tags WHERE file_object_tags.file_object_id = file_objects.id AND file_object_tags.tag = ?)", t) } @@ -302,7 +302,7 @@ func (s *sqlDatabase) SearchFileObjectByTags(_ context.Context, tags []string, p // OpenFileObjectContent opens a blob for reading and returns an io.ReadCloser func (s *sqlDatabase) OpenFileObjectContent(ctx context.Context, objID uint) (io.ReadCloser, error) { - conn, err := s.sqlDB.Conn(ctx) + conn, err := s.objectsSqlDB.Conn(ctx) if err != nil { return nil, fmt.Errorf("failed to get connection: %w", err) } @@ -311,7 +311,7 @@ func (s *sqlDatabase) OpenFileObjectContent(ctx context.Context, objID uint) (io err = conn.Raw(func(driverConn any) error { sqliteConn := driverConn.(*sqlite3.SQLiteConn) - blob, err := sqliteConn.Blob("main", (FileObject{}).TableName(), "content", int64(objID), 0) + blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(objID), 0) if err != nil { return fmt.Errorf("failed to open blob: %w", err) } @@ -359,7 +359,7 @@ func (s *sqlDatabase) ListFileObjects(_ context.Context, page, pageSize uint64) } var total int64 - if err := s.conn.Model(&FileObject{}).Count(&total).Error; err != nil { + if err := s.objectsConn.Model(&FileObject{}).Count(&total).Error; err != nil { return params.FileObjectPaginatedResponse{}, fmt.Errorf("failed to count file objects: %w", err) } @@ -383,7 +383,7 @@ func (s *sqlDatabase) ListFileObjects(_ context.Context, page, pageSize uint64) } var fileObjs []FileObject - if err := s.conn.Preload("TagsList").Omit("content"). + if err := s.objectsConn.Preload("TagsList").Omit("content"). Limit(queryPageSize). Offset(queryOffset). Order("created_at DESC"). diff --git a/database/sql/models.go b/database/sql/models.go index b369b284..6e112c3d 100644 --- a/database/sql/models.go +++ b/database/sql/models.go @@ -477,19 +477,9 @@ type FileObject struct { Content []byte `gorm:"type:blob"` } -// TableName overrides the default table name -func (FileObject) TableName() string { - return "file_objects" -} - // FileObjectTag represents the many-to-many relationship between documents and tags type FileObjectTag struct { ID uint `gorm:"primaryKey"` FileObjectID uint `gorm:"index:idx_fileobject_tags_doc_id,priority:1;index:idx_fileobject_tags_tag,priority:1;not null"` Tag string `gorm:"type:TEXT COLLATE NOCASE;index:idx_fileobject_tags_tag,priority:2;not null"` } - -// TableName overrides the default table name -func (FileObjectTag) TableName() string { - return "file_object_tags" -} diff --git a/database/sql/sql.go b/database/sql/sql.go index 5674ab3d..7a305871 100644 --- a/database/sql/sql.go +++ b/database/sql/sql.go @@ -90,12 +90,46 @@ func NewSQLDatabase(ctx context.Context, cfg config.Database) (common.Store, err if err != nil { return nil, fmt.Errorf("failed to get underlying database connection: %w", err) } + + // Create separate connection for objects database (only for SQLite) + var objectsConn *gorm.DB + var objectsSqlDB *sql.DB + if cfg.DbBackend == config.SQLiteBackend { + // Get the blob database file path + blobFile, err := cfg.SQLite.BlobDBFile() + if err != nil { + return nil, fmt.Errorf("failed to get blob database file: %w", err) + } + + // Create config for objects database + objectsCfg := config.Database{ + DbBackend: config.SQLiteBackend, + Debug: cfg.Debug, + Passphrase: cfg.Passphrase, + SQLite: config.SQLite{ + DBFile: blobFile, + }, + } + + objectsConn, err = newDBConn(objectsCfg) + if err != nil { + return nil, fmt.Errorf("error creating objects DB connection: %w", err) + } + + objectsSqlDB, err = objectsConn.DB() + if err != nil { + return nil, fmt.Errorf("failed to get underlying objects database connection: %w", err) + } + } + db := &sqlDatabase{ - conn: conn, - sqlDB: sqlDB, - ctx: ctx, - cfg: cfg, - producer: producer, + conn: conn, + sqlDB: sqlDB, + objectsConn: objectsConn, + objectsSqlDB: objectsSqlDB, + ctx: ctx, + cfg: cfg, + producer: producer, } if err := db.migrateDB(); err != nil { @@ -108,6 +142,10 @@ type sqlDatabase struct { conn *gorm.DB sqlDB *sql.DB + // objectsConn is a separate GORM connection to the objects database + objectsConn *gorm.DB + objectsSqlDB *sql.DB + ctx context.Context cfg config.Database producer common.Producer @@ -404,6 +442,25 @@ func (s *sqlDatabase) migrateWorkflow() error { return nil } +func (s *sqlDatabase) migrateFileObjects() error { + // Only migrate for SQLite backend + if s.cfg.DbBackend != config.SQLiteBackend { + return nil + } + + // Use the separate objects database connection + if s.objectsConn == nil { + return fmt.Errorf("objects database connection not initialized") + } + + // Use GORM AutoMigrate on the separate connection + if err := s.objectsConn.AutoMigrate(&FileObject{}, &FileObjectTag{}); err != nil { + return fmt.Errorf("failed to migrate file objects: %w", err) + } + + return nil +} + func (s *sqlDatabase) ensureTemplates(migrateTemplates bool) error { if !migrateTemplates { return nil @@ -616,11 +673,15 @@ func (s *sqlDatabase) migrateDB() error { &ControllerInfo{}, &WorkflowJob{}, &ScaleSet{}, - &FileObject{}, - &FileObjectTag{}, ); err != nil { return fmt.Errorf("error running auto migrate: %w", err) } + + // Migrate file object tables in the attached objectsdb schema + if err := s.migrateFileObjects(); err != nil { + return fmt.Errorf("error migrating file objects: %w", err) + } + s.conn.Exec("PRAGMA foreign_keys = ON") if !hasMinAgeField {