Files
coder/coderd/files/cache.go
2025-07-01 13:50:37 -06:00

313 lines
8.8 KiB
Go

package files
import (
"bytes"
"context"
"io/fs"
"sync"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/xerrors"
archivefs "github.com/coder/coder/v2/archive/fs"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/util/lazy"
)
type FileAcquirer interface {
Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error)
}
// New returns a file cache that will fetch files from a database
func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache {
return &Cache{
lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry),
authz: authz,
cacheMetrics: newCacheMetrics(registerer),
}
}
func newCacheMetrics(registerer prometheus.Registerer) cacheMetrics {
subsystem := "file_cache"
f := promauto.With(registerer)
return cacheMetrics{
currentCacheSize: f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_bytes_current",
Help: "The current amount of memory of all files currently open in the file cache.",
}),
totalCacheSize: f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_bytes_total",
Help: "The total amount of memory ever opened in the file cache. This number never decrements.",
}),
currentOpenFiles: f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_current",
Help: "The count of unique files currently open in the file cache.",
}),
totalOpenedFiles: f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_total",
Help: "The total count of unique files ever opened in the file cache.",
}),
currentOpenFileReferences: f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_current",
Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.",
}),
totalOpenFileReferences: f.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_total",
Help: "The total number of file references ever opened in the file cache. The 'hit' label indicates if the file was loaded from the cache.",
}, []string{"hit"}),
}
}
// Cache persists the files for template versions, and is used by dynamic
// parameters to deduplicate the files in memory. When any number of users opens
// the workspace creation form for a given template version, it's files are
// loaded into memory exactly once. We hold those files until there are no
// longer any open connections, and then we remove the value from the map.
type Cache struct {
lock sync.Mutex
data map[uuid.UUID]*cacheEntry
authz rbac.Authorizer
// metrics
cacheMetrics
}
type cacheMetrics struct {
currentOpenFileReferences prometheus.Gauge
totalOpenFileReferences *prometheus.CounterVec
currentOpenFiles prometheus.Gauge
totalOpenedFiles prometheus.Counter
currentCacheSize prometheus.Gauge
totalCacheSize prometheus.Counter
}
type cacheEntry struct {
// Safety: refCount must only be accessed while the Cache lock is held.
refCount int
value *lazy.ValueWithError[CacheEntryValue]
// Safety: close must only be called while the Cache lock is held
close func()
// Safety: purge must only be called while the Cache lock is held
purge func()
}
type CacheEntryValue struct {
fs.FS
Object rbac.Object
Size int64
}
var _ fs.FS = (*CloseFS)(nil)
// CloseFS is a wrapper around fs.FS that implements io.Closer. The Close()
// method tells the cache to release the fileID. Once all open references are
// closed, the file is removed from the cache.
type CloseFS struct {
fs.FS
close func()
}
func (f *CloseFS) Close() {
f.close()
}
// Acquire will load the fs.FS for the given file. It guarantees that parallel
// calls for the same fileID will only result in one fetch, and that parallel
// calls for distinct fileIDs will fetch in parallel.
//
// Safety: Every call to Acquire that does not return an error must call close
// on the returned value when it is done being used.
func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) {
// It's important that this `Load` call occurs outside `prepare`, after the
// mutex has been released, or we would continue to hold the lock until the
// entire file has been fetched, which may be slow, and would prevent other
// files from being fetched in parallel.
e := c.prepare(db, fileID)
ev, err := e.value.Load()
if err != nil {
c.lock.Lock()
defer c.lock.Unlock()
e.close()
e.purge()
return nil, err
}
cleanup := func() {
c.lock.Lock()
defer c.lock.Unlock()
e.close()
}
// We always run the fetch under a system context and actor, so we need to
// check the caller's context (including the actor) manually before returning.
// Check if the caller's context was canceled. Even though `Authorize` takes
// a context, we still check it manually first because none of our mock
// database implementations check for context cancellation.
if err := ctx.Err(); err != nil {
cleanup()
return nil, err
}
// Check that the caller is authorized to access the file
subject, ok := dbauthz.ActorFromContext(ctx)
if !ok {
cleanup()
return nil, dbauthz.ErrNoActor
}
if err := c.authz.Authorize(ctx, subject, policy.ActionRead, ev.Object); err != nil {
cleanup()
return nil, err
}
var closeOnce sync.Once
return &CloseFS{
FS: ev.FS,
close: func() {
// sync.Once makes the Close() idempotent, so we can call it
// multiple times without worrying about double-releasing.
closeOnce.Do(func() {
c.lock.Lock()
defer c.lock.Unlock()
e.close()
})
},
}, nil
}
func (c *Cache) prepare(db database.Store, fileID uuid.UUID) *cacheEntry {
c.lock.Lock()
defer c.lock.Unlock()
hitLabel := "true"
entry, ok := c.data[fileID]
if !ok {
hitLabel = "false"
var purgeOnce sync.Once
entry = &cacheEntry{
value: lazy.NewWithError(func() (CacheEntryValue, error) {
val, err := fetch(db, fileID)
if err != nil {
return val, err
}
// Add the size of the file to the cache size metrics.
c.currentCacheSize.Add(float64(val.Size))
c.totalCacheSize.Add(float64(val.Size))
return val, err
}),
close: func() {
entry.refCount--
c.currentOpenFileReferences.Dec()
if entry.refCount > 0 {
return
}
entry.purge()
},
purge: func() {
purgeOnce.Do(func() {
c.purge(fileID)
})
},
}
c.data[fileID] = entry
c.currentOpenFiles.Inc()
c.totalOpenedFiles.Inc()
}
c.currentOpenFileReferences.Inc()
c.totalOpenFileReferences.WithLabelValues(hitLabel).Inc()
entry.refCount++
return entry
}
// purge immediately removes an entry from the cache, even if it has open
// references.
// Safety: Must only be called while the Cache lock is held
func (c *Cache) purge(fileID uuid.UUID) {
entry, ok := c.data[fileID]
if !ok {
// If we land here, it's probably because of a fetch attempt that
// resulted in an error, and got purged already. It may also be an
// erroneous extra close, but we can't really distinguish between those
// two cases currently.
return
}
// Purge the file from the cache.
c.currentOpenFiles.Dec()
ev, err := entry.value.Load()
if err == nil {
c.currentCacheSize.Add(-1 * float64(ev.Size))
}
delete(c.data, fileID)
}
// Count returns the number of files currently in the cache.
// Mainly used for unit testing assertions.
func (c *Cache) Count() int {
c.lock.Lock()
defer c.lock.Unlock()
return len(c.data)
}
func fetch(store database.Store, fileID uuid.UUID) (CacheEntryValue, error) {
// Because many callers can be waiting on the same file fetch concurrently, we
// want to prevent any failures that would cause them all to receive errors
// because the caller who initiated the fetch would fail.
// - We always run the fetch with an uncancelable context, and then check
// context cancellation for each acquirer afterwards.
// - We always run the fetch as a system user, and then check authorization
// for each acquirer afterwards.
// This prevents a canceled context or an unauthorized user from "holding up
// the queue".
//nolint:gocritic
file, err := store.GetFileByID(dbauthz.AsFileReader(context.Background()), fileID)
if err != nil {
return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err)
}
content := bytes.NewBuffer(file.Data)
return CacheEntryValue{
Object: file.RBACObject(),
FS: archivefs.FromTarReader(content),
Size: int64(len(file.Data)),
}, nil
}