feat: add filecache prometheus metrics (#18089)

Dynamic parameters has an in memory file cache. This adds prometheus
metrics to monitor said cache.
This commit is contained in:
Steven Masley
2025-05-30 11:54:54 -05:00
committed by GitHub
parent 562c4696de
commit 9db114d17c
3 changed files with 179 additions and 33 deletions

View File

@ -572,7 +572,7 @@ func New(options *Options) *API {
TemplateScheduleStore: options.TemplateScheduleStore, TemplateScheduleStore: options.TemplateScheduleStore,
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore, UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
AccessControlStore: options.AccessControlStore, AccessControlStore: options.AccessControlStore,
FileCache: files.NewFromStore(options.Database), FileCache: files.NewFromStore(options.Database, options.PrometheusRegistry),
Experiments: experiments, Experiments: experiments,
WebpushDispatcher: options.WebPushDispatcher, WebpushDispatcher: options.WebPushDispatcher,
healthCheckGroup: &singleflight.Group[string, *healthsdk.HealthcheckReport]{}, healthCheckGroup: &singleflight.Group[string, *healthsdk.HealthcheckReport]{},

View File

@ -7,6 +7,8 @@ import (
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/xerrors" "golang.org/x/xerrors"
archivefs "github.com/coder/coder/v2/archive/fs" archivefs "github.com/coder/coder/v2/archive/fs"
@ -16,22 +18,78 @@ import (
// NewFromStore returns a file cache that will fetch files from the provided // NewFromStore returns a file cache that will fetch files from the provided
// database. // database.
func NewFromStore(store database.Store) *Cache { func NewFromStore(store database.Store, registerer prometheus.Registerer) *Cache {
fetcher := func(ctx context.Context, fileID uuid.UUID) (fs.FS, error) { fetch := func(ctx context.Context, fileID uuid.UUID) (cacheEntryValue, error) {
file, err := store.GetFileByID(ctx, fileID) file, err := store.GetFileByID(ctx, fileID)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to read file from database: %w", err) return cacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err)
} }
content := bytes.NewBuffer(file.Data) content := bytes.NewBuffer(file.Data)
return archivefs.FromTarReader(content), nil return cacheEntryValue{
FS: archivefs.FromTarReader(content),
size: int64(content.Len()),
}, nil
} }
return &Cache{ return New(fetch, registerer)
}
func New(fetch fetcher, registerer prometheus.Registerer) *Cache {
return (&Cache{
lock: sync.Mutex{}, lock: sync.Mutex{},
data: make(map[uuid.UUID]*cacheEntry), data: make(map[uuid.UUID]*cacheEntry),
fetcher: fetcher, fetcher: fetch,
} }).registerMetrics(registerer)
}
func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache {
subsystem := "file_cache"
f := promauto.With(registerer)
c.currentCacheSize = f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_bytes_current",
Help: "The current amount of memory of all files currently open in the file cache.",
})
c.totalCacheSize = f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_size_bytes_total",
Help: "The total amount of memory ever opened in the file cache. This number never decrements.",
})
c.currentOpenFiles = f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_current",
Help: "The count of unique files currently open in the file cache.",
})
c.totalOpenedFiles = f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_files_total",
Help: "The total count of unique files ever opened in the file cache.",
})
c.currentOpenFileReferences = f.NewGauge(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_current",
Help: "The count of file references currently open in the file cache. Multiple references can be held for the same file.",
})
c.totalOpenFileReferences = f.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: subsystem,
Name: "open_file_refs_total",
Help: "The total number of file references ever opened in the file cache.",
})
return c
} }
// Cache persists the files for template versions, and is used by dynamic // Cache persists the files for template versions, and is used by dynamic
@ -43,15 +101,34 @@ type Cache struct {
lock sync.Mutex lock sync.Mutex
data map[uuid.UUID]*cacheEntry data map[uuid.UUID]*cacheEntry
fetcher fetcher
// metrics
cacheMetrics
}
type cacheMetrics struct {
currentOpenFileReferences prometheus.Gauge
totalOpenFileReferences prometheus.Counter
currentOpenFiles prometheus.Gauge
totalOpenedFiles prometheus.Counter
currentCacheSize prometheus.Gauge
totalCacheSize prometheus.Counter
}
type cacheEntryValue struct {
fs.FS
size int64
} }
type cacheEntry struct { type cacheEntry struct {
// refCount must only be accessed while the Cache lock is held. // refCount must only be accessed while the Cache lock is held.
refCount int refCount int
value *lazy.ValueWithError[fs.FS] value *lazy.ValueWithError[cacheEntryValue]
} }
type fetcher func(context.Context, uuid.UUID) (fs.FS, error) type fetcher func(context.Context, uuid.UUID) (cacheEntryValue, error)
// Acquire will load the fs.FS for the given file. It guarantees that parallel // Acquire will load the fs.FS for the given file. It guarantees that parallel
// calls for the same fileID will only result in one fetch, and that parallel // calls for the same fileID will only result in one fetch, and that parallel
@ -66,18 +143,27 @@ func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (fs.FS, error) {
it, err := c.prepare(ctx, fileID).Load() it, err := c.prepare(ctx, fileID).Load()
if err != nil { if err != nil {
c.Release(fileID) c.Release(fileID)
return nil, err
} }
return it, err return it.FS, err
} }
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[fs.FS] { func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[cacheEntryValue] {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
entry, ok := c.data[fileID] entry, ok := c.data[fileID]
if !ok { if !ok {
value := lazy.NewWithError(func() (fs.FS, error) { value := lazy.NewWithError(func() (cacheEntryValue, error) {
return c.fetcher(ctx, fileID) val, err := c.fetcher(ctx, fileID)
// Always add to the cache size the bytes of the file loaded.
if err == nil {
c.currentCacheSize.Add(float64(val.size))
c.totalCacheSize.Add(float64(val.size))
}
return val, err
}) })
entry = &cacheEntry{ entry = &cacheEntry{
@ -85,8 +171,12 @@ func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithEr
refCount: 0, refCount: 0,
} }
c.data[fileID] = entry c.data[fileID] = entry
c.currentOpenFiles.Inc()
c.totalOpenedFiles.Inc()
} }
c.currentOpenFileReferences.Inc()
c.totalOpenFileReferences.Inc()
entry.refCount++ entry.refCount++
return entry.value return entry.value
} }
@ -105,11 +195,19 @@ func (c *Cache) Release(fileID uuid.UUID) {
return return
} }
c.currentOpenFileReferences.Dec()
entry.refCount-- entry.refCount--
if entry.refCount > 0 { if entry.refCount > 0 {
return return
} }
c.currentOpenFiles.Dec()
ev, err := entry.value.Load()
if err == nil {
c.currentCacheSize.Add(-1 * float64(ev.size))
}
delete(c.data, fileID) delete(c.data, fileID)
} }

View File

@ -2,32 +2,38 @@ package files
import ( import (
"context" "context"
"io/fs"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero" "github.com/spf13/afero"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/coder/coder/v2/coderd/coderdtest/promhelp"
"github.com/coder/coder/v2/testutil" "github.com/coder/coder/v2/testutil"
) )
func cachePromMetricName(metric string) string {
return "coderd_file_cache_" + metric
}
func TestConcurrency(t *testing.T) { func TestConcurrency(t *testing.T) {
t.Parallel() t.Parallel()
const fileSize = 10
emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs())) emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
var fetches atomic.Int64 var fetches atomic.Int64
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) { reg := prometheus.NewRegistry()
c := New(func(_ context.Context, _ uuid.UUID) (cacheEntryValue, error) {
fetches.Add(1) fetches.Add(1)
// Wait long enough before returning to make sure that all of the goroutines // Wait long enough before returning to make sure that all of the goroutines
// will be waiting in line, ensuring that no one duplicated a fetch. // will be waiting in line, ensuring that no one duplicated a fetch.
time.Sleep(testutil.IntervalMedium) time.Sleep(testutil.IntervalMedium)
return emptyFS, nil return cacheEntryValue{FS: emptyFS, size: fileSize}, nil
}) }, reg)
batches := 1000 batches := 1000
groups := make([]*errgroup.Group, 0, batches) groups := make([]*errgroup.Group, 0, batches)
@ -55,15 +61,29 @@ func TestConcurrency(t *testing.T) {
require.NoError(t, g.Wait()) require.NoError(t, g.Wait())
} }
require.Equal(t, int64(batches), fetches.Load()) require.Equal(t, int64(batches), fetches.Load())
// Verify all the counts & metrics are correct.
require.Equal(t, batches, c.Count())
require.Equal(t, batches*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
require.Equal(t, batches*fileSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_size_bytes_total"), nil))
require.Equal(t, batches, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
require.Equal(t, batches, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_total"), nil))
require.Equal(t, batches*batchSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
require.Equal(t, batches*batchSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_file_refs_total"), nil))
} }
func TestRelease(t *testing.T) { func TestRelease(t *testing.T) {
t.Parallel() t.Parallel()
const fileSize = 10
emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs())) emptyFS := afero.NewIOFS(afero.NewReadOnlyFs(afero.NewMemMapFs()))
c := newTestCache(func(_ context.Context, _ uuid.UUID) (fs.FS, error) { reg := prometheus.NewRegistry()
return emptyFS, nil c := New(func(_ context.Context, _ uuid.UUID) (cacheEntryValue, error) {
}) return cacheEntryValue{
FS: emptyFS,
size: fileSize,
}, nil
}, reg)
batches := 100 batches := 100
ids := make([]uuid.UUID, 0, batches) ids := make([]uuid.UUID, 0, batches)
@ -73,11 +93,21 @@ func TestRelease(t *testing.T) {
// Acquire a bunch of references // Acquire a bunch of references
batchSize := 10 batchSize := 10
for _, id := range ids { for openedIdx, id := range ids {
for range batchSize { for batchIdx := range batchSize {
it, err := c.Acquire(t.Context(), id) it, err := c.Acquire(t.Context(), id)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, emptyFS, it) require.Equal(t, emptyFS, it)
// Each time a new file is opened, the metrics should be updated as so:
opened := openedIdx + 1
// Number of unique files opened is equal to the idx of the ids.
require.Equal(t, opened, c.Count())
require.Equal(t, opened, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
// Current file size is unique files * file size.
require.Equal(t, opened*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
// The number of refs is the current iteration of both loops.
require.Equal(t, ((opened-1)*batchSize)+(batchIdx+1), promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
} }
} }
@ -85,20 +115,38 @@ func TestRelease(t *testing.T) {
require.Equal(t, len(c.data), batches) require.Equal(t, len(c.data), batches)
// Now release all of the references // Now release all of the references
for _, id := range ids { for closedIdx, id := range ids {
for range batchSize { stillOpen := len(ids) - closedIdx
for closingIdx := range batchSize {
c.Release(id) c.Release(id)
// Each time a file is released, the metrics should decrement the file refs
require.Equal(t, (stillOpen*batchSize)-(closingIdx+1), promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
closed := closingIdx+1 == batchSize
if closed {
continue
}
// File ref still exists, so the counts should not change yet.
require.Equal(t, stillOpen, c.Count())
require.Equal(t, stillOpen, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
require.Equal(t, stillOpen*fileSize, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
} }
} }
// ...and make sure that the cache has emptied itself. // ...and make sure that the cache has emptied itself.
require.Equal(t, len(c.data), 0) require.Equal(t, len(c.data), 0)
}
func newTestCache(fetcher func(context.Context, uuid.UUID) (fs.FS, error)) Cache { // Verify all the counts & metrics are correct.
return Cache{ // All existing files are closed
lock: sync.Mutex{}, require.Equal(t, 0, c.Count())
data: make(map[uuid.UUID]*cacheEntry), require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_size_bytes_current"), nil))
fetcher: fetcher, require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_files_current"), nil))
} require.Equal(t, 0, promhelp.GaugeValue(t, reg, cachePromMetricName("open_file_refs_current"), nil))
// Total counts remain
require.Equal(t, batches*fileSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_size_bytes_total"), nil))
require.Equal(t, batches, promhelp.CounterValue(t, reg, cachePromMetricName("open_files_total"), nil))
require.Equal(t, batches*batchSize, promhelp.CounterValue(t, reg, cachePromMetricName("open_file_refs_total"), nil))
} }