Use tmp storage while uploading, use tx where needed
Use temporary storage while the client is streaming the file to GARM. This ensures that while uploading, we don't lock the blob database. On slow connections this would mean that no readers would be able to access the db while data was being written to it via the upload process. By saving the file to a temporary location and only after we receive the entire thing, add it to the DB, we significantly reduce the time we need to keep the DB locked. Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
This commit is contained in:
parent
db2b908605
commit
73b330467b
8 changed files with 193 additions and 64 deletions
|
|
@ -9,6 +9,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"gorm.io/gorm"
|
||||
|
|
@ -21,9 +22,31 @@ import (
|
|||
|
||||
// func (s *sqlDatabase) CreateFileObject(ctx context.Context, name string, size int64, tags []string, reader io.Reader) (fileObjParam params.FileObject, err error) {
|
||||
func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateFileObjectParams, reader io.Reader) (fileObjParam params.FileObject, err error) {
|
||||
// Save the file to temporary storage first. This allows us to accept the entire file, even over
|
||||
// a slow connection, without locking the database as we stream the file to the DB.
|
||||
// SQLite will lock the entire database (including for readers) when the data is being commited.
|
||||
tmpFile, err := util.GetTmpFileHandle("")
|
||||
if err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to create tmp file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
tmpFile.Close()
|
||||
os.Remove(tmpFile.Name())
|
||||
}()
|
||||
if _, err := io.Copy(tmpFile, reader); err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to copy data: %w", err)
|
||||
}
|
||||
if err := tmpFile.Sync(); err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to flush data to disk: %w", err)
|
||||
}
|
||||
// File has been transfered. We need to seek to the beginning of the file. This same handler will be used
|
||||
// to streab the data to the database.
|
||||
if _, err := tmpFile.Seek(0, 0); err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to seek to beginning: %w", err)
|
||||
}
|
||||
// Read first 8KB for type detection
|
||||
buffer := make([]byte, 8192)
|
||||
n, _ := io.ReadFull(reader, buffer)
|
||||
n, _ := io.ReadFull(tmpFile, buffer)
|
||||
fileType := util.DetectFileType(buffer[:n])
|
||||
|
||||
fileObj := FileObject{
|
||||
|
|
@ -39,19 +62,44 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
|
|||
}
|
||||
}()
|
||||
|
||||
// Create the file first, without any space allocated for the blob.
|
||||
if err := s.objectsConn.Create(&fileObj).Error; err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to create file object: %w", err)
|
||||
}
|
||||
var fileBlob FileBlob
|
||||
err = s.objectsConn.Transaction(func(tx *gorm.DB) error {
|
||||
// Create the file first
|
||||
if err := tx.Create(&fileObj).Error; err != nil {
|
||||
return fmt.Errorf("failed to create file object: %w", err)
|
||||
}
|
||||
|
||||
// allocate space for the blob using the zeroblob() function. This will allow us to avoid
|
||||
// having to allocate potentially huge byte arrays in memory and writing that huge blob to
|
||||
// disk.
|
||||
query := `UPDATE file_objects SET content = zeroblob(?) WHERE id = ?`
|
||||
if err := s.objectsConn.Exec(query, param.Size, fileObj.ID).Error; err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to allocate disk space: %w", err)
|
||||
}
|
||||
// create the file blob, without any space allocated for the blob.
|
||||
fileBlob = FileBlob{
|
||||
FileObjectID: fileObj.ID,
|
||||
}
|
||||
if err := tx.Create(&fileBlob).Error; err != nil {
|
||||
return fmt.Errorf("failed to create file blob object: %w", err)
|
||||
}
|
||||
|
||||
// allocate space for the blob using the zeroblob() function. This will allow us to avoid
|
||||
// having to allocate potentially huge byte arrays in memory and writing that huge blob to
|
||||
// disk.
|
||||
query := `UPDATE file_blobs SET content = zeroblob(?) WHERE id = ?`
|
||||
if err := tx.Exec(query, param.Size, fileBlob.ID).Error; err != nil {
|
||||
return fmt.Errorf("failed to allocate disk space: %w", err)
|
||||
}
|
||||
// Create tag entries
|
||||
for _, tag := range param.Tags {
|
||||
fileObjTag := FileObjectTag{
|
||||
FileObjectID: fileObj.ID,
|
||||
Tag: tag,
|
||||
}
|
||||
if err := tx.Create(&fileObjTag).Error; err != nil {
|
||||
return fmt.Errorf("failed to add tag: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to create database entry for blob: %w", err)
|
||||
}
|
||||
// Stream file to blob and compute SHA256
|
||||
conn, err := s.objectsSqlDB.Conn(ctx)
|
||||
if err != nil {
|
||||
|
|
@ -63,7 +111,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
|
|||
err = conn.Raw(func(driverConn any) error {
|
||||
sqliteConn := driverConn.(*sqlite3.SQLiteConn)
|
||||
|
||||
blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(fileObj.ID), 1)
|
||||
blob, err := sqliteConn.Blob("main", "file_blobs", "content", int64(fileBlob.ID), 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blob: %w", err)
|
||||
}
|
||||
|
|
@ -79,7 +127,7 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
|
|||
hasher.Write(buffer[:n])
|
||||
|
||||
// Stream the rest with hash computation
|
||||
_, err = io.Copy(io.MultiWriter(blob, hasher), reader)
|
||||
_, err = io.Copy(io.MultiWriter(blob, hasher), tmpFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write blob: %w", err)
|
||||
}
|
||||
|
|
@ -97,17 +145,6 @@ func (s *sqlDatabase) CreateFileObject(ctx context.Context, param params.CreateF
|
|||
return params.FileObject{}, fmt.Errorf("failed to update sha256sum: %w", err)
|
||||
}
|
||||
|
||||
// Create tag entries
|
||||
for _, tag := range param.Tags {
|
||||
fileObjTag := FileObjectTag{
|
||||
FileObjectID: fileObj.ID,
|
||||
Tag: tag,
|
||||
}
|
||||
if err := s.objectsConn.Create(&fileObjTag).Error; err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to add tag: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Reload document with tags
|
||||
if err := s.objectsConn.Preload("TagsList").Omit("content").First(&fileObj, fileObj.ID).Error; err != nil {
|
||||
return params.FileObject{}, fmt.Errorf("failed to get file object: %w", err)
|
||||
|
|
@ -307,11 +344,15 @@ func (s *sqlDatabase) OpenFileObjectContent(ctx context.Context, objID uint) (io
|
|||
return nil, fmt.Errorf("failed to get connection: %w", err)
|
||||
}
|
||||
|
||||
var fileBlob FileBlob
|
||||
if err := s.objectsConn.Where("file_object_id = ?", objID).Omit("content").First(&fileBlob).Error; err != nil {
|
||||
return nil, fmt.Errorf("failed to get file blob: %w", err)
|
||||
}
|
||||
var blobReader io.ReadCloser
|
||||
err = conn.Raw(func(driverConn any) error {
|
||||
sqliteConn := driverConn.(*sqlite3.SQLiteConn)
|
||||
|
||||
blob, err := sqliteConn.Blob("main", "file_objects", "content", int64(objID), 0)
|
||||
blob, err := sqliteConn.Blob("main", "file_blobs", "content", int64(fileBlob.ID), 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blob: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -473,6 +473,22 @@ type FileObject struct {
|
|||
SHA256 string `gorm:"type:text"`
|
||||
// Tags is a JSON array of tags
|
||||
TagsList []FileObjectTag `gorm:"foreignKey:FileObjectID;constraint:OnDelete:CASCADE"`
|
||||
// Content is a foreign key to a different table where the blob is actually stored.
|
||||
// Updating a field in an sqlite 3 DB will read the entire field, update the column
|
||||
// and write it back to a different location. SQLite3 will then mark the old row as "deleted"
|
||||
// and allow it to be vaccumed. But if we have a blob column with a huge blob, any
|
||||
// update operation will consume a lot of resources and take a long time.
|
||||
// Using a dedicated table for the blob (which doesn't change), speeds up updates of
|
||||
// metadata fields like name, description, tags, etc.
|
||||
Content FileBlob `gorm:"foreignKey:FileObjectID;constraint:OnDelete:CASCADE"`
|
||||
}
|
||||
|
||||
// FileBlob is the immutable blob of bytes that once written will not be changed.
|
||||
// We leave the SHA256, file type and size in the parent table, because we need to
|
||||
// we able to get that info easily, without preloading the blob table.
|
||||
type FileBlob struct {
|
||||
gorm.Model
|
||||
FileObjectID uint `gorm:"index:idx_fileobject_blob_id"`
|
||||
// Content is a BLOB column for storing binary data
|
||||
Content []byte `gorm:"type:blob"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,45 +91,33 @@ func NewSQLDatabase(ctx context.Context, cfg config.Database) (common.Store, err
|
|||
return nil, fmt.Errorf("failed to get underlying database connection: %w", err)
|
||||
}
|
||||
|
||||
db := &sqlDatabase{
|
||||
conn: conn,
|
||||
sqlDB: sqlDB,
|
||||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
producer: producer,
|
||||
}
|
||||
|
||||
// Create separate connection for objects database (only for SQLite)
|
||||
var objectsConn *gorm.DB
|
||||
var objectsSqlDB *sql.DB
|
||||
if cfg.DbBackend == config.SQLiteBackend {
|
||||
// Get the blob database file path
|
||||
blobFile, err := cfg.SQLite.BlobDBFile()
|
||||
// Get config for objects database
|
||||
objectsCfg, err := cfg.SQLiteBlobDatabaseConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get blob database file: %w", err)
|
||||
return nil, fmt.Errorf("failed to get blob DB config: %w", err)
|
||||
}
|
||||
|
||||
// Create config for objects database
|
||||
objectsCfg := config.Database{
|
||||
DbBackend: config.SQLiteBackend,
|
||||
Debug: cfg.Debug,
|
||||
Passphrase: cfg.Passphrase,
|
||||
SQLite: config.SQLite{
|
||||
DBFile: blobFile,
|
||||
},
|
||||
}
|
||||
|
||||
objectsConn, err = newDBConn(objectsCfg)
|
||||
objectsConn, err := newDBConn(objectsCfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating objects DB connection: %w", err)
|
||||
}
|
||||
|
||||
objectsSqlDB, err = objectsConn.DB()
|
||||
objectsSqlDB, err := objectsConn.DB()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get underlying objects database connection: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
db := &sqlDatabase{
|
||||
conn: conn,
|
||||
sqlDB: sqlDB,
|
||||
objectsConn: objectsConn,
|
||||
objectsSqlDB: objectsSqlDB,
|
||||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
producer: producer,
|
||||
db.objectsConn = objectsConn
|
||||
db.objectsSqlDB = objectsSqlDB
|
||||
}
|
||||
|
||||
if err := db.migrateDB(); err != nil {
|
||||
|
|
@ -454,7 +442,7 @@ func (s *sqlDatabase) migrateFileObjects() error {
|
|||
}
|
||||
|
||||
// Use GORM AutoMigrate on the separate connection
|
||||
if err := s.objectsConn.AutoMigrate(&FileObject{}, &FileObjectTag{}); err != nil {
|
||||
if err := s.objectsConn.AutoMigrate(&FileObject{}, &FileBlob{}, &FileObjectTag{}); err != nil {
|
||||
return fmt.Errorf("failed to migrate file objects: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue