chore: cache: split caches implementation out of handler
- create the caches interface and matching cachesImpl
- move the cache logic out of handler
- openDB
- readCache
- useCache
- gcCache
- access to the storage struct
- serve
- commit
- exist
- write
- add getCaches / setCaches to the handler interface so it can be
used by tests. The caches test should be implemented independently
in the future but this is a different kind of cleanup.
- no functional change, minimal refactor
This commit is contained in:
parent
37f634fd31
commit
6c4e705f97
5 changed files with 349 additions and 282 deletions
305
act/artifactcache/caches.go
Normal file
305
act/artifactcache/caches.go
Normal file
|
|
@ -0,0 +1,305 @@
|
|||
package artifactcache
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/timshannon/bolthold"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type caches interface {
|
||||
openDB() (*bolthold.Store, error)
|
||||
validateMac(rundata RunData) (string, error)
|
||||
readCache(id uint64) (*Cache, error)
|
||||
useCache(id uint64) error
|
||||
setgcAt(at time.Time)
|
||||
gcCache()
|
||||
|
||||
serve(w http.ResponseWriter, r *http.Request, id uint64)
|
||||
commit(id uint64, size int64) (int64, error)
|
||||
exist(id uint64) (bool, error)
|
||||
write(id, offset uint64, reader io.Reader) error
|
||||
}
|
||||
|
||||
type cachesImpl struct {
|
||||
dir string
|
||||
storage *Storage
|
||||
logger logrus.FieldLogger
|
||||
secret string
|
||||
|
||||
gcing atomic.Bool
|
||||
gcAt time.Time
|
||||
}
|
||||
|
||||
func newCaches(dir, secret string, logger logrus.FieldLogger) (caches, error) {
|
||||
c := &cachesImpl{
|
||||
secret: secret,
|
||||
}
|
||||
|
||||
c.logger = logger
|
||||
|
||||
if dir == "" {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dir = filepath.Join(home, ".cache", "actcache")
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.dir = dir
|
||||
|
||||
storage, err := NewStorage(filepath.Join(dir, "cache"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.storage = storage
|
||||
|
||||
c.gcCache()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *cachesImpl) openDB() (*bolthold.Store, error) {
|
||||
file := filepath.Join(c.dir, "bolt.db")
|
||||
db, err := bolthold.Open(file, 0o644, &bolthold.Options{
|
||||
Encoder: json.Marshal,
|
||||
Decoder: json.Unmarshal,
|
||||
Options: &bbolt.Options{
|
||||
Timeout: 5 * time.Second,
|
||||
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
|
||||
FreelistType: bbolt.DefaultOptions.FreelistType,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Open(%s): %w", file, err)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// if not found, return (nil, nil) instead of an error.
|
||||
func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
|
||||
cache := &Cache{}
|
||||
for _, prefix := range keys {
|
||||
// if a key in the list matches exactly, don't return partial matches
|
||||
if err := db.FindOne(cache,
|
||||
bolthold.Where("Repo").Eq(repo).Index("Repo").
|
||||
And("Key").Eq(prefix).
|
||||
And("Version").Eq(version).
|
||||
And("WriteIsolationKey").Eq(writeIsolationKey).
|
||||
And("Complete").Eq(true).
|
||||
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
|
||||
re, err := regexp.Compile(prefixPattern)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := db.FindOne(cache,
|
||||
bolthold.Where("Repo").Eq(repo).Index("Repo").
|
||||
And("Key").RegExp(re).
|
||||
And("Version").Eq(version).
|
||||
And("WriteIsolationKey").Eq(writeIsolationKey).
|
||||
And("Complete").Eq(true).
|
||||
SortBy("CreatedAt").Reverse()); err != nil {
|
||||
if errors.Is(err, bolthold.ErrNotFound) {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func insertCache(db *bolthold.Store, cache *Cache) error {
|
||||
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
||||
return fmt.Errorf("insert cache: %w", err)
|
||||
}
|
||||
// write back id to db
|
||||
if err := db.Update(cache.ID, cache); err != nil {
|
||||
return fmt.Errorf("write back id to db: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachesImpl) readCache(id uint64) (*Cache, error) {
|
||||
db, err := c.openDB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer db.Close()
|
||||
cache := &Cache{}
|
||||
if err := db.Get(id, cache); err != nil {
|
||||
return nil, fmt.Errorf("readCache: Get(%v): %w", id, err)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
func (c *cachesImpl) useCache(id uint64) error {
|
||||
db, err := c.openDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
cache := &Cache{}
|
||||
if err := db.Get(id, cache); err != nil {
|
||||
return fmt.Errorf("useCache: Get(%v): %w", id, err)
|
||||
}
|
||||
cache.UsedAt = time.Now().Unix()
|
||||
if err := db.Update(cache.ID, cache); err != nil {
|
||||
return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cachesImpl) serve(w http.ResponseWriter, r *http.Request, id uint64) {
|
||||
c.storage.Serve(w, r, id)
|
||||
}
|
||||
|
||||
func (c *cachesImpl) commit(id uint64, size int64) (int64, error) {
|
||||
return c.storage.Commit(id, size)
|
||||
}
|
||||
|
||||
func (c *cachesImpl) exist(id uint64) (bool, error) {
|
||||
return c.storage.Exist(id)
|
||||
}
|
||||
|
||||
func (c *cachesImpl) write(id, offset uint64, reader io.Reader) error {
|
||||
return c.storage.Write(id, offset, reader)
|
||||
}
|
||||
|
||||
const (
|
||||
keepUsed = 30 * 24 * time.Hour
|
||||
keepUnused = 7 * 24 * time.Hour
|
||||
keepTemp = 5 * time.Minute
|
||||
keepOld = 5 * time.Minute
|
||||
)
|
||||
|
||||
func (c *cachesImpl) setgcAt(at time.Time) {
|
||||
c.gcAt = at
|
||||
}
|
||||
|
||||
func (c *cachesImpl) gcCache() {
|
||||
if c.gcing.Load() {
|
||||
return
|
||||
}
|
||||
if !c.gcing.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
defer c.gcing.Store(false)
|
||||
|
||||
if time.Since(c.gcAt) < time.Hour {
|
||||
c.logger.Debugf("skip gc: %v", c.gcAt.String())
|
||||
return
|
||||
}
|
||||
c.gcAt = time.Now()
|
||||
c.logger.Debugf("gc: %v", c.gcAt.String())
|
||||
|
||||
db, err := c.openDB()
|
||||
if err != nil {
|
||||
fatal(c.logger, err)
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Remove the caches which are not completed for a while, they are most likely to be broken.
|
||||
var caches []*Cache
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
|
||||
And("Complete").Eq(false),
|
||||
); err != nil {
|
||||
fatal(c.logger, fmt.Errorf("gc caches not completed: %v", err))
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
c.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
c.logger.Errorf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
c.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches which have not been used recently.
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
|
||||
); err != nil {
|
||||
fatal(c.logger, fmt.Errorf("gc caches old not used: %v", err))
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
c.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
c.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
c.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches which are too old.
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
|
||||
); err != nil {
|
||||
fatal(c.logger, fmt.Errorf("gc caches too old: %v", err))
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
c.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
c.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
c.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches with the same key and version, keep the latest one.
|
||||
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
|
||||
if results, err := db.FindAggregate(
|
||||
&Cache{},
|
||||
bolthold.Where("Complete").Eq(true),
|
||||
"Key", "Version",
|
||||
); err != nil {
|
||||
fatal(c.logger, fmt.Errorf("gc aggregate caches: %v", err))
|
||||
} else {
|
||||
for _, result := range results {
|
||||
if result.Count() <= 1 {
|
||||
continue
|
||||
}
|
||||
result.Sort("CreatedAt")
|
||||
caches = caches[:0]
|
||||
result.Reduction(&caches)
|
||||
for _, cache := range caches[:len(caches)-1] {
|
||||
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
|
||||
// Keep it since it has been used recently, even if it's old.
|
||||
// Or it could break downloading in process.
|
||||
continue
|
||||
}
|
||||
c.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
c.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
c.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -7,19 +7,14 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/julienschmidt/httprouter"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/timshannon/bolthold"
|
||||
"go.etcd.io/bbolt"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v11/act/common"
|
||||
)
|
||||
|
|
@ -39,7 +34,8 @@ type Handler interface {
|
|||
ExternalURL() string
|
||||
Close() error
|
||||
isClosed() bool
|
||||
openDB() (*bolthold.Store, error)
|
||||
getCaches() caches
|
||||
setCaches(caches caches)
|
||||
find(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
||||
reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
||||
upload(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
||||
|
|
@ -47,32 +43,21 @@ type Handler interface {
|
|||
get(w http.ResponseWriter, r *http.Request, params httprouter.Params)
|
||||
clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params)
|
||||
middleware(handler httprouter.Handle) httprouter.Handle
|
||||
readCache(id uint64) (*Cache, error)
|
||||
useCache(id uint64) error
|
||||
setgcAt(at time.Time)
|
||||
gcCache()
|
||||
responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any)
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
dir string
|
||||
storage *Storage
|
||||
caches caches
|
||||
router *httprouter.Router
|
||||
listener net.Listener
|
||||
server *http.Server
|
||||
logger logrus.FieldLogger
|
||||
secret string
|
||||
|
||||
gcing atomic.Bool
|
||||
gcAt time.Time
|
||||
|
||||
outboundIP string
|
||||
}
|
||||
|
||||
func StartHandler(dir, outboundIP string, port uint16, secret string, logger logrus.FieldLogger) (Handler, error) {
|
||||
h := &handler{
|
||||
secret: secret,
|
||||
}
|
||||
h := &handler{}
|
||||
|
||||
if logger == nil {
|
||||
discard := logrus.New()
|
||||
|
|
@ -82,24 +67,11 @@ func StartHandler(dir, outboundIP string, port uint16, secret string, logger log
|
|||
logger = logger.WithField("module", "artifactcache")
|
||||
h.logger = logger
|
||||
|
||||
if dir == "" {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dir = filepath.Join(home, ".cache", "actcache")
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h.dir = dir
|
||||
|
||||
storage, err := NewStorage(filepath.Join(dir, "cache"))
|
||||
caches, err := newCaches(dir, secret, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h.storage = storage
|
||||
h.caches = caches
|
||||
|
||||
if outboundIP != "" {
|
||||
h.outboundIP = outboundIP
|
||||
|
|
@ -119,8 +91,6 @@ func StartHandler(dir, outboundIP string, port uint16, secret string, logger log
|
|||
|
||||
h.router = router
|
||||
|
||||
h.gcCache()
|
||||
|
||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) // listen on all interfaces
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -176,27 +146,18 @@ func (h *handler) isClosed() bool {
|
|||
return h.listener == nil && h.server == nil
|
||||
}
|
||||
|
||||
func (h *handler) openDB() (*bolthold.Store, error) {
|
||||
file := filepath.Join(h.dir, "bolt.db")
|
||||
db, err := bolthold.Open(file, 0o644, &bolthold.Options{
|
||||
Encoder: json.Marshal,
|
||||
Decoder: json.Unmarshal,
|
||||
Options: &bbolt.Options{
|
||||
Timeout: 5 * time.Second,
|
||||
NoGrowSync: bbolt.DefaultOptions.NoGrowSync,
|
||||
FreelistType: bbolt.DefaultOptions.FreelistType,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Open(%s): %v", file, err)
|
||||
}
|
||||
return db, nil
|
||||
func (h *handler) getCaches() caches {
|
||||
return h.caches
|
||||
}
|
||||
|
||||
func (h *handler) setCaches(caches caches) {
|
||||
h.caches = caches
|
||||
}
|
||||
|
||||
// GET /_apis/artifactcache/cache
|
||||
func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
rundata := runDataFromHeaders(r)
|
||||
repo, err := h.validateMac(rundata)
|
||||
repo, err := h.caches.validateMac(rundata)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 403, err)
|
||||
return
|
||||
|
|
@ -209,7 +170,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter
|
|||
}
|
||||
version := r.URL.Query().Get("version")
|
||||
|
||||
db, err := h.openDB()
|
||||
db, err := h.caches.openDB()
|
||||
if err != nil {
|
||||
h.responseFatalJSON(w, r, err)
|
||||
return
|
||||
|
|
@ -234,7 +195,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter
|
|||
return
|
||||
}
|
||||
|
||||
if ok, err := h.storage.Exist(cache.ID); err != nil {
|
||||
if ok, err := h.caches.exist(cache.ID); err != nil {
|
||||
h.responseJSON(w, r, 500, err)
|
||||
return
|
||||
} else if !ok {
|
||||
|
|
@ -253,7 +214,7 @@ func (h *handler) find(w http.ResponseWriter, r *http.Request, params httprouter
|
|||
// POST /_apis/artifactcache/caches
|
||||
func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
rundata := runDataFromHeaders(r)
|
||||
repo, err := h.validateMac(rundata)
|
||||
repo, err := h.caches.validateMac(rundata)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 403, err)
|
||||
return
|
||||
|
|
@ -268,7 +229,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou
|
|||
api.Key = strings.ToLower(api.Key)
|
||||
|
||||
cache := api.ToCache()
|
||||
db, err := h.openDB()
|
||||
db, err := h.caches.openDB()
|
||||
if err != nil {
|
||||
h.responseFatalJSON(w, r, err)
|
||||
return
|
||||
|
|
@ -292,7 +253,7 @@ func (h *handler) reserve(w http.ResponseWriter, r *http.Request, params httprou
|
|||
// PATCH /_apis/artifactcache/caches/:id
|
||||
func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
rundata := runDataFromHeaders(r)
|
||||
repo, err := h.validateMac(rundata)
|
||||
repo, err := h.caches.validateMac(rundata)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 403, err)
|
||||
return
|
||||
|
|
@ -304,7 +265,7 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
|
|||
return
|
||||
}
|
||||
|
||||
cache, err := h.readCache(id)
|
||||
cache, err := h.caches.readCache(id)
|
||||
if err != nil {
|
||||
if errors.Is(err, bolthold.ErrNotFound) {
|
||||
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
||||
|
|
@ -333,11 +294,11 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
|
|||
h.responseJSON(w, r, 400, fmt.Errorf("cache parseContentRange(%s): %w", r.Header.Get("Content-Range"), err))
|
||||
return
|
||||
}
|
||||
if err := h.storage.Write(cache.ID, start, r.Body); err != nil {
|
||||
if err := h.caches.write(cache.ID, start, r.Body); err != nil {
|
||||
h.responseJSON(w, r, 500, fmt.Errorf("cache storage.Write: %w", err))
|
||||
return
|
||||
}
|
||||
if err := h.useCache(id); err != nil {
|
||||
if err := h.caches.useCache(id); err != nil {
|
||||
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
|
||||
return
|
||||
}
|
||||
|
|
@ -347,7 +308,7 @@ func (h *handler) upload(w http.ResponseWriter, r *http.Request, params httprout
|
|||
// POST /_apis/artifactcache/caches/:id
|
||||
func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
rundata := runDataFromHeaders(r)
|
||||
repo, err := h.validateMac(rundata)
|
||||
repo, err := h.caches.validateMac(rundata)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 403, err)
|
||||
return
|
||||
|
|
@ -359,7 +320,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
|||
return
|
||||
}
|
||||
|
||||
cache, err := h.readCache(id)
|
||||
cache, err := h.caches.readCache(id)
|
||||
if err != nil {
|
||||
if errors.Is(err, bolthold.ErrNotFound) {
|
||||
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
||||
|
|
@ -384,7 +345,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
|||
return
|
||||
}
|
||||
|
||||
size, err := h.storage.Commit(cache.ID, cache.Size)
|
||||
size, err := h.caches.commit(cache.ID, cache.Size)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 500, fmt.Errorf("commit(%v): %w", cache.ID, err))
|
||||
return
|
||||
|
|
@ -392,7 +353,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
|||
// write real size back to cache, it may be different from the current value when the request doesn't specify it.
|
||||
cache.Size = size
|
||||
|
||||
db, err := h.openDB()
|
||||
db, err := h.caches.openDB()
|
||||
if err != nil {
|
||||
h.responseFatalJSON(w, r, err)
|
||||
return
|
||||
|
|
@ -411,7 +372,7 @@ func (h *handler) commit(w http.ResponseWriter, r *http.Request, params httprout
|
|||
// GET /_apis/artifactcache/artifacts/:id
|
||||
func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
rundata := runDataFromHeaders(r)
|
||||
repo, err := h.validateMac(rundata)
|
||||
repo, err := h.caches.validateMac(rundata)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 403, err)
|
||||
return
|
||||
|
|
@ -423,7 +384,7 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.
|
|||
return
|
||||
}
|
||||
|
||||
cache, err := h.readCache(id)
|
||||
cache, err := h.caches.readCache(id)
|
||||
if err != nil {
|
||||
if errors.Is(err, bolthold.ErrNotFound) {
|
||||
h.responseJSON(w, r, 404, fmt.Errorf("cache %d: not reserved", id))
|
||||
|
|
@ -444,17 +405,17 @@ func (h *handler) get(w http.ResponseWriter, r *http.Request, params httprouter.
|
|||
return
|
||||
}
|
||||
|
||||
if err := h.useCache(id); err != nil {
|
||||
if err := h.caches.useCache(id); err != nil {
|
||||
h.responseJSON(w, r, 500, fmt.Errorf("cache useCache: %w", err))
|
||||
return
|
||||
}
|
||||
h.storage.Serve(w, r, id)
|
||||
h.caches.serve(w, r, id)
|
||||
}
|
||||
|
||||
// POST /_apis/artifactcache/clean
|
||||
func (h *handler) clean(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
||||
rundata := runDataFromHeaders(r)
|
||||
_, err := h.validateMac(rundata)
|
||||
_, err := h.caches.validateMac(rundata)
|
||||
if err != nil {
|
||||
h.responseJSON(w, r, 403, err)
|
||||
return
|
||||
|
|
@ -469,206 +430,7 @@ func (h *handler) middleware(handler httprouter.Handle) httprouter.Handle {
|
|||
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
|
||||
h.logger.Debugf("%s %s", r.Method, r.RequestURI)
|
||||
handler(w, r, params)
|
||||
go h.gcCache()
|
||||
}
|
||||
}
|
||||
|
||||
// if not found, return (nil, nil) instead of an error.
|
||||
func findCache(db *bolthold.Store, repo string, keys []string, version, writeIsolationKey string) (*Cache, error) {
|
||||
cache := &Cache{}
|
||||
for _, prefix := range keys {
|
||||
// if a key in the list matches exactly, don't return partial matches
|
||||
if err := db.FindOne(cache,
|
||||
bolthold.Where("Repo").Eq(repo).Index("Repo").
|
||||
And("Key").Eq(prefix).
|
||||
And("Version").Eq(version).
|
||||
And("WriteIsolationKey").Eq(writeIsolationKey).
|
||||
And("Complete").Eq(true).
|
||||
SortBy("CreatedAt").Reverse()); err == nil || !errors.Is(err, bolthold.ErrNotFound) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("find cache entry equal to %s: %w", prefix, err)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
|
||||
re, err := regexp.Compile(prefixPattern)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err := db.FindOne(cache,
|
||||
bolthold.Where("Repo").Eq(repo).Index("Repo").
|
||||
And("Key").RegExp(re).
|
||||
And("Version").Eq(version).
|
||||
And("WriteIsolationKey").Eq(writeIsolationKey).
|
||||
And("Complete").Eq(true).
|
||||
SortBy("CreatedAt").Reverse()); err != nil {
|
||||
if errors.Is(err, bolthold.ErrNotFound) {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("find cache entry starting with %s: %w", prefix, err)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func insertCache(db *bolthold.Store, cache *Cache) error {
|
||||
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
|
||||
return fmt.Errorf("insert cache: %w", err)
|
||||
}
|
||||
// write back id to db
|
||||
if err := db.Update(cache.ID, cache); err != nil {
|
||||
return fmt.Errorf("write back id to db: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) readCache(id uint64) (*Cache, error) {
|
||||
db, err := h.openDB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer db.Close()
|
||||
cache := &Cache{}
|
||||
if err := db.Get(id, cache); err != nil {
|
||||
return nil, fmt.Errorf("readCache: Get(%v): %w", id, err)
|
||||
}
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
func (h *handler) useCache(id uint64) error {
|
||||
db, err := h.openDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
cache := &Cache{}
|
||||
if err := db.Get(id, cache); err != nil {
|
||||
return fmt.Errorf("useCache: Get(%v): %w", id, err)
|
||||
}
|
||||
cache.UsedAt = time.Now().Unix()
|
||||
if err := db.Update(cache.ID, cache); err != nil {
|
||||
return fmt.Errorf("useCache: Update(%v): %v", cache.ID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
keepUsed = 30 * 24 * time.Hour
|
||||
keepUnused = 7 * 24 * time.Hour
|
||||
keepTemp = 5 * time.Minute
|
||||
keepOld = 5 * time.Minute
|
||||
)
|
||||
|
||||
func (h *handler) setgcAt(at time.Time) {
|
||||
h.gcAt = at
|
||||
}
|
||||
|
||||
func (h *handler) gcCache() {
|
||||
if h.gcing.Load() {
|
||||
return
|
||||
}
|
||||
if !h.gcing.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
defer h.gcing.Store(false)
|
||||
|
||||
if time.Since(h.gcAt) < time.Hour {
|
||||
h.logger.Debugf("skip gc: %v", h.gcAt.String())
|
||||
return
|
||||
}
|
||||
h.gcAt = time.Now()
|
||||
h.logger.Debugf("gc: %v", h.gcAt.String())
|
||||
|
||||
db, err := h.openDB()
|
||||
if err != nil {
|
||||
fatal(h.logger, err)
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Remove the caches which are not completed for a while, they are most likely to be broken.
|
||||
var caches []*Cache
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
|
||||
And("Complete").Eq(false),
|
||||
); err != nil {
|
||||
fatal(h.logger, fmt.Errorf("gc caches not completed: %v", err))
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Errorf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches which have not been used recently.
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
|
||||
); err != nil {
|
||||
fatal(h.logger, fmt.Errorf("gc caches old not used: %v", err))
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches which are too old.
|
||||
caches = caches[:0]
|
||||
if err := db.Find(&caches, bolthold.
|
||||
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
|
||||
); err != nil {
|
||||
fatal(h.logger, fmt.Errorf("gc caches too old: %v", err))
|
||||
} else {
|
||||
for _, cache := range caches {
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old caches with the same key and version, keep the latest one.
|
||||
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
|
||||
if results, err := db.FindAggregate(
|
||||
&Cache{},
|
||||
bolthold.Where("Complete").Eq(true),
|
||||
"Key", "Version",
|
||||
); err != nil {
|
||||
fatal(h.logger, fmt.Errorf("gc aggregate caches: %v", err))
|
||||
} else {
|
||||
for _, result := range results {
|
||||
if result.Count() <= 1 {
|
||||
continue
|
||||
}
|
||||
result.Sort("CreatedAt")
|
||||
caches = caches[:0]
|
||||
result.Reduction(&caches)
|
||||
for _, cache := range caches[:len(caches)-1] {
|
||||
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
|
||||
// Keep it since it has been used recently, even if it's old.
|
||||
// Or it could break downloading in process.
|
||||
continue
|
||||
}
|
||||
h.storage.Remove(cache.ID)
|
||||
if err := db.Delete(cache.ID, cache); err != nil {
|
||||
h.logger.Warnf("delete cache: %v", err)
|
||||
continue
|
||||
}
|
||||
h.logger.Infof("deleted cache: %+v", cache)
|
||||
}
|
||||
}
|
||||
go h.caches.gcCache()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ func TestHandler(t *testing.T) {
|
|||
|
||||
defer func() {
|
||||
t.Run("inspect db", func(t *testing.T) {
|
||||
db, err := handler.openDB()
|
||||
db, err := handler.getCaches().openDB()
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
require.NoError(t, db.Bolt().View(func(tx *bbolt.Tx) error {
|
||||
|
|
@ -986,17 +986,17 @@ func TestHandler_gcCache(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
db, err := handler.openDB()
|
||||
db, err := handler.getCaches().openDB()
|
||||
require.NoError(t, err)
|
||||
for _, c := range cases {
|
||||
require.NoError(t, insertCache(db, c.Cache))
|
||||
}
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
handler.setgcAt(time.Time{}) // ensure gcCache will not skip
|
||||
handler.gcCache()
|
||||
handler.getCaches().setgcAt(time.Time{}) // ensure gcCache will not skip
|
||||
handler.getCaches().gcCache()
|
||||
|
||||
db, err = handler.openDB()
|
||||
db, err = handler.getCaches().openDB()
|
||||
require.NoError(t, err)
|
||||
for i, v := range cases {
|
||||
t.Run(fmt.Sprintf("%d_%s", i, v.Cache.Key), func(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -14,13 +14,13 @@ import (
|
|||
|
||||
var ErrValidation = errors.New("validation error")
|
||||
|
||||
func (h *handler) validateMac(rundata RunData) (string, error) {
|
||||
func (c *cachesImpl) validateMac(rundata RunData) (string, error) {
|
||||
// TODO: allow configurable max age
|
||||
if !validateAge(rundata.Timestamp) {
|
||||
return "", ErrValidation
|
||||
}
|
||||
|
||||
expectedMAC := ComputeMac(h.secret, rundata.RepositoryFullName, rundata.RunNumber, rundata.Timestamp, rundata.WriteIsolationKey)
|
||||
expectedMAC := ComputeMac(c.secret, rundata.RepositoryFullName, rundata.RunNumber, rundata.Timestamp, rundata.WriteIsolationKey)
|
||||
if hmac.Equal([]byte(expectedMAC), []byte(rundata.RepositoryMAC)) {
|
||||
return rundata.RepositoryFullName, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import (
|
|||
)
|
||||
|
||||
func TestMac(t *testing.T) {
|
||||
handler := &handler{
|
||||
cache := &cachesImpl{
|
||||
secret: "secret for testing",
|
||||
}
|
||||
|
||||
|
|
@ -18,7 +18,7 @@ func TestMac(t *testing.T) {
|
|||
run := "1"
|
||||
ts := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
|
||||
mac := ComputeMac(handler.secret, name, run, ts, "")
|
||||
mac := ComputeMac(cache.secret, name, run, ts, "")
|
||||
rundata := RunData{
|
||||
RepositoryFullName: name,
|
||||
RunNumber: run,
|
||||
|
|
@ -26,7 +26,7 @@ func TestMac(t *testing.T) {
|
|||
RepositoryMAC: mac,
|
||||
}
|
||||
|
||||
repoName, err := handler.validateMac(rundata)
|
||||
repoName, err := cache.validateMac(rundata)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, name, repoName)
|
||||
})
|
||||
|
|
@ -36,7 +36,7 @@ func TestMac(t *testing.T) {
|
|||
run := "1"
|
||||
ts := "9223372036854775807" // This should last us for a while...
|
||||
|
||||
mac := ComputeMac(handler.secret, name, run, ts, "")
|
||||
mac := ComputeMac(cache.secret, name, run, ts, "")
|
||||
rundata := RunData{
|
||||
RepositoryFullName: name,
|
||||
RunNumber: run,
|
||||
|
|
@ -44,7 +44,7 @@ func TestMac(t *testing.T) {
|
|||
RepositoryMAC: mac,
|
||||
}
|
||||
|
||||
_, err := handler.validateMac(rundata)
|
||||
_, err := cache.validateMac(rundata)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ func TestMac(t *testing.T) {
|
|||
RepositoryMAC: "this is not the right mac :D",
|
||||
}
|
||||
|
||||
repoName, err := handler.validateMac(rundata)
|
||||
repoName, err := cache.validateMac(rundata)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "", repoName)
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue