Use a sepatare GORM connection for blobs

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
Gabriel Adrian Samfira 2025-10-08 11:55:14 +00:00 committed by Gabriel
parent bab85171ee
commit db2b908605
4 changed files with 95 additions and 33 deletions

View file

@ -40,20 +40,20 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
}()
// Create the file first, without any space allocated for the blob.
if err := s.conn.Create(&fileObj).Error; err != nil {
if err := s.objectsConn.Create(&fileObj).Error; err != nil {
return params.FileObject{}, fmt.Errorf("failed to create file object: %w", err)
}
// allocate space for the blob using the zeroblob() function. This will allow us to avoid
// having to allocate potentially huge byte arrays in memory and writing that huge blob to
// disk.
query := fmt.Sprintf(`UPDATE %q SET content = zeroblob(?) WHERE id = ?`, fileObj.TableName())
if err := s.conn.Exec(query, param.Size, fileObj.ID).Error; err != nil {
query := `UPDATE file_objects SET content = zeroblob(?) WHERE id = ?`
if err := s.objectsConn.Exec(query, param.Size, fileObj.ID).Error; err != nil {
return params.FileObject{}, fmt.Errorf("failed to allocate disk space: %w", err)
}
// Stream file to blob and compute SHA256
conn, err := s.sqlDB.Conn(ctx)
conn, err := s.objectsSqlDB.Conn(ctx)
if err != nil {
return params.FileObject{}, fmt.Errorf("failed to get connection from pool: %w", err)
}
@ -63,7 +63,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
err = conn.Raw(func(driverConn any) error {
sqliteConn := driverConn.(*sqlite3.SQLiteConn)
blob, err := sqliteConn.Blob("main", fileObj.TableName(), "content", int64(fileObj.ID), 1)
blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(fileObj.ID), 1)
if err != nil {
return fmt.Errorf("failed to open blob: %w", err)
}
@ -93,7 +93,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
}
// Update document with SHA256
if err := s.conn.Model(&fileObj).Update("sha256", sha256sum).Error; err != nil {
if err := s.objectsConn.Model(&fileObj).Update("sha256", sha256sum).Error; err != nil {
return params.FileObject{}, fmt.Errorf("failed to update sha256sum: %w", err)
}
@ -103,13 +103,13 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
FileObjectID: fileObj.ID,
Tag: tag,
}
if err := s.conn.Create(&fileObjTag).Error; err != nil {
if err := s.objectsConn.Create(&fileObjTag).Error; err != nil {
return params.FileObject{}, fmt.Errorf("failed to add tag: %w", err)
}
}
// Reload document with tags
if err := s.conn.Preload("TagsList").Omit("content").First(&fileObj, fileObj.ID).Error; err != nil {
if err := s.objectsConn.Preload("TagsList").Omit("content").First(&fileObj, fileObj.ID).Error; err != nil {
return params.FileObject{}, fmt.Errorf("failed to get file object: %w", err)
}
return s.sqlFileObjectToCommonParams(fileObj), nil
@ -127,7 +127,7 @@ func (s *sqlDatabase) UpdateFileObject(_ context.Context, objID uint, param para
}()
var fileObj FileObject
err = s.conn.Transaction(func(tx *gorm.DB) error {
err = s.objectsConn.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return runnerErrors.NewNotFoundError("could not find file object with ID: %d", objID)
@ -192,7 +192,7 @@ func (s *sqlDatabase) DeleteFileObject(_ context.Context, objID uint) (err error
}()
var fileObj FileObject
err = s.conn.Transaction(func(tx *gorm.DB) error {
err = s.objectsConn.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return runnerErrors.ErrNotFound
@ -219,7 +219,7 @@ func (s *sqlDatabase) DeleteFileObject(_ context.Context, objID uint) (err error
func (s *sqlDatabase) GetFileObject(_ context.Context, objID uint) (params.FileObject, error) {
var fileObj FileObject
if err := s.conn.Preload("TagsList").Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil {
if err := s.objectsConn.Preload("TagsList").Where("id = ?", objID).Omit("content").First(&fileObj).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return params.FileObject{}, runnerErrors.NewNotFoundError("could not find file object with ID: %d", objID)
}
@ -237,7 +237,7 @@ func (s *sqlDatabase) SearchFileObjectByTags(_ context.Context, tags []string, p
}
var fileObjectRes []FileObject
query := s.conn.Model(&FileObject{}).Preload("TagsList").Omit("content")
query := s.objectsConn.Model(&FileObject{}).Preload("TagsList").Omit("content")
for _, t := range tags {
query = query.Where("EXISTS (SELECT 1 FROM file_object_tags WHERE file_object_tags.file_object_id = file_objects.id AND file_object_tags.tag = ?)", t)
}
@ -302,7 +302,7 @@ func (s *sqlDatabase) SearchFileObjectByTags(_ context.Context, tags []string, p
// OpenFileObjectContent opens a blob for reading and returns an io.ReadCloser
func (s *sqlDatabase) OpenFileObjectContent(ctx context.Context, objID uint) (io.ReadCloser, error) {
conn, err := s.sqlDB.Conn(ctx)
conn, err := s.objectsSqlDB.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get connection: %w", err)
}
@ -311,7 +311,7 @@ func (s *sqlDatabase) OpenFileObjectContent(ctx context.Context, objID uint) (io
err = conn.Raw(func(driverConn any) error {
sqliteConn := driverConn.(*sqlite3.SQLiteConn)
blob, err := sqliteConn.Blob("main", (FileObject{}).TableName(), "content", int64(objID), 0)
blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(objID), 0)
if err != nil {
return fmt.Errorf("failed to open blob: %w", err)
}
@ -359,7 +359,7 @@ func (s *sqlDatabase) ListFileObjects(_ context.Context, page, pageSize uint64)
}
var total int64
if err := s.conn.Model(&FileObject{}).Count(&total).Error; err != nil {
if err := s.objectsConn.Model(&FileObject{}).Count(&total).Error; err != nil {
return params.FileObjectPaginatedResponse{}, fmt.Errorf("failed to count file objects: %w", err)
}
@ -383,7 +383,7 @@ func (s *sqlDatabase) ListFileObjects(_ context.Context, page, pageSize uint64)
}
var fileObjs []FileObject
if err := s.conn.Preload("TagsList").Omit("content").
if err := s.objectsConn.Preload("TagsList").Omit("content").
Limit(queryPageSize).
Offset(queryOffset).
Order("created_at DESC").

View file

@ -477,19 +477,9 @@ type FileObject struct {
Content []byte `gorm:"type:blob"`
}
// TableName overrides the default table name
func (FileObject) TableName() string {
return "file_objects"
}
// FileObjectTag represents the many-to-many relationship between documents and tags
type FileObjectTag struct {
ID uint `gorm:"primaryKey"`
FileObjectID uint `gorm:"index:idx_fileobject_tags_doc_id,priority:1;index:idx_fileobject_tags_tag,priority:1;not null"`
Tag string `gorm:"type:TEXT COLLATE NOCASE;index:idx_fileobject_tags_tag,priority:2;not null"`
}
// TableName overrides the default table name
func (FileObjectTag) TableName() string {
return "file_object_tags"
}

View file

@ -90,12 +90,46 @@ func NewSQLDatabase(ctx context.Context, cfg config.Database) (common.Store, err
if err != nil {
return nil, fmt.Errorf("failed to get underlying database connection: %w", err)
}
// Create separate connection for objects database (only for SQLite)
var objectsConn *gorm.DB
var objectsSqlDB *sql.DB
if cfg.DbBackend == config.SQLiteBackend {
// Get the blob database file path
blobFile, err := cfg.SQLite.BlobDBFile()
if err != nil {
return nil, fmt.Errorf("failed to get blob database file: %w", err)
}
// Create config for objects database
objectsCfg := config.Database{
DbBackend: config.SQLiteBackend,
Debug: cfg.Debug,
Passphrase: cfg.Passphrase,
SQLite: config.SQLite{
DBFile: blobFile,
},
}
objectsConn, err = newDBConn(objectsCfg)
if err != nil {
return nil, fmt.Errorf("error creating objects DB connection: %w", err)
}
objectsSqlDB, err = objectsConn.DB()
if err != nil {
return nil, fmt.Errorf("failed to get underlying objects database connection: %w", err)
}
}
db := &sqlDatabase{
conn: conn,
sqlDB: sqlDB,
ctx: ctx,
cfg: cfg,
producer: producer,
conn: conn,
sqlDB: sqlDB,
objectsConn: objectsConn,
objectsSqlDB: objectsSqlDB,
ctx: ctx,
cfg: cfg,
producer: producer,
}
if err := db.migrateDB(); err != nil {
@ -108,6 +142,10 @@ type sqlDatabase struct {
conn *gorm.DB
sqlDB *sql.DB
// objectsConn is a separate GORM connection to the objects database
objectsConn *gorm.DB
objectsSqlDB *sql.DB
ctx context.Context
cfg config.Database
producer common.Producer
@ -404,6 +442,25 @@ func (s *sqlDatabase) migrateWorkflow() error {
return nil
}
func (s *sqlDatabase) migrateFileObjects() error {
// Only migrate for SQLite backend
if s.cfg.DbBackend != config.SQLiteBackend {
return nil
}
// Use the separate objects database connection
if s.objectsConn == nil {
return fmt.Errorf("objects database connection not initialized")
}
// Use GORM AutoMigrate on the separate connection
if err := s.objectsConn.AutoMigrate(&FileObject{}, &FileObjectTag{}); err != nil {
return fmt.Errorf("failed to migrate file objects: %w", err)
}
return nil
}
func (s *sqlDatabase) ensureTemplates(migrateTemplates bool) error {
if !migrateTemplates {
return nil
@ -616,11 +673,15 @@ func (s *sqlDatabase) migrateDB() error {
&ControllerInfo{},
&WorkflowJob{},
&ScaleSet{},
&FileObject{},
&FileObjectTag{},
); err != nil {
return fmt.Errorf("error running auto migrate: %w", err)
}
// Migrate file object tables in the attached objectsdb schema
if err := s.migrateFileObjects(); err != nil {
return fmt.Errorf("error migrating file objects: %w", err)
}
s.conn.Exec("PRAGMA foreign_keys = ON")
if !hasMinAgeField {