mirror of
https://github.com/coder/coder.git
synced 2025-07-15 22:20:27 +00:00
chore: use database in current context for file cache (#18490)
Using the db.Store when in a TX causes a deadlock for dbmem. In production, this can cause a deadlock if at the current conn pool limit.
This commit is contained in:
@ -20,38 +20,15 @@ import (
|
||||
)
|
||||
|
||||
type FileAcquirer interface {
|
||||
Acquire(ctx context.Context, fileID uuid.UUID) (*CloseFS, error)
|
||||
Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error)
|
||||
}
|
||||
|
||||
// NewFromStore returns a file cache that will fetch files from the provided
|
||||
// database.
|
||||
func NewFromStore(store database.Store, registerer prometheus.Registerer, authz rbac.Authorizer) *Cache {
|
||||
fetch := func(ctx context.Context, fileID uuid.UUID) (CacheEntryValue, error) {
|
||||
// Make sure the read does not fail due to authorization issues.
|
||||
// Authz is checked on the Acquire call, so this is safe.
|
||||
//nolint:gocritic
|
||||
file, err := store.GetFileByID(dbauthz.AsFileReader(ctx), fileID)
|
||||
if err != nil {
|
||||
return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err)
|
||||
}
|
||||
|
||||
content := bytes.NewBuffer(file.Data)
|
||||
return CacheEntryValue{
|
||||
Object: file.RBACObject(),
|
||||
FS: archivefs.FromTarReader(content),
|
||||
Size: int64(len(file.Data)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return New(fetch, registerer, authz)
|
||||
}
|
||||
|
||||
func New(fetch fetcher, registerer prometheus.Registerer, authz rbac.Authorizer) *Cache {
|
||||
// New returns a file cache that will fetch files from a database
|
||||
func New(registerer prometheus.Registerer, authz rbac.Authorizer) *Cache {
|
||||
return (&Cache{
|
||||
lock: sync.Mutex{},
|
||||
data: make(map[uuid.UUID]*cacheEntry),
|
||||
fetcher: fetch,
|
||||
authz: authz,
|
||||
lock: sync.Mutex{},
|
||||
data: make(map[uuid.UUID]*cacheEntry),
|
||||
authz: authz,
|
||||
}).registerMetrics(registerer)
|
||||
}
|
||||
|
||||
@ -110,9 +87,8 @@ func (c *Cache) registerMetrics(registerer prometheus.Registerer) *Cache {
|
||||
// loaded into memory exactly once. We hold those files until there are no
|
||||
// longer any open connections, and then we remove the value from the map.
|
||||
type Cache struct {
|
||||
lock sync.Mutex
|
||||
data map[uuid.UUID]*cacheEntry
|
||||
fetcher
|
||||
lock sync.Mutex
|
||||
data map[uuid.UUID]*cacheEntry
|
||||
authz rbac.Authorizer
|
||||
|
||||
// metrics
|
||||
@ -142,8 +118,6 @@ type cacheEntry struct {
|
||||
value *lazy.ValueWithError[CacheEntryValue]
|
||||
}
|
||||
|
||||
type fetcher func(context.Context, uuid.UUID) (CacheEntryValue, error)
|
||||
|
||||
var _ fs.FS = (*CloseFS)(nil)
|
||||
|
||||
// CloseFS is a wrapper around fs.FS that implements io.Closer. The Close()
|
||||
@ -163,12 +137,12 @@ func (f *CloseFS) Close() { f.close() }
|
||||
//
|
||||
// Safety: Every call to Acquire that does not return an error must have a
|
||||
// matching call to Release.
|
||||
func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (*CloseFS, error) {
|
||||
func (c *Cache) Acquire(ctx context.Context, db database.Store, fileID uuid.UUID) (*CloseFS, error) {
|
||||
// It's important that this `Load` call occurs outside `prepare`, after the
|
||||
// mutex has been released, or we would continue to hold the lock until the
|
||||
// entire file has been fetched, which may be slow, and would prevent other
|
||||
// files from being fetched in parallel.
|
||||
it, err := c.prepare(ctx, fileID).Load()
|
||||
it, err := c.prepare(ctx, db, fileID).Load()
|
||||
if err != nil {
|
||||
c.release(fileID)
|
||||
return nil, err
|
||||
@ -195,14 +169,14 @@ func (c *Cache) Acquire(ctx context.Context, fileID uuid.UUID) (*CloseFS, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Cache) prepare(ctx context.Context, fileID uuid.UUID) *lazy.ValueWithError[CacheEntryValue] {
|
||||
func (c *Cache) prepare(ctx context.Context, db database.Store, fileID uuid.UUID) *lazy.ValueWithError[CacheEntryValue] {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
entry, ok := c.data[fileID]
|
||||
if !ok {
|
||||
value := lazy.NewWithError(func() (CacheEntryValue, error) {
|
||||
val, err := c.fetcher(ctx, fileID)
|
||||
val, err := fetch(ctx, db, fileID)
|
||||
|
||||
// Always add to the cache size the bytes of the file loaded.
|
||||
if err == nil {
|
||||
@ -269,3 +243,20 @@ func (c *Cache) Count() int {
|
||||
|
||||
return len(c.data)
|
||||
}
|
||||
|
||||
func fetch(ctx context.Context, store database.Store, fileID uuid.UUID) (CacheEntryValue, error) {
|
||||
// Make sure the read does not fail due to authorization issues.
|
||||
// Authz is checked on the Acquire call, so this is safe.
|
||||
//nolint:gocritic
|
||||
file, err := store.GetFileByID(dbauthz.AsFileReader(ctx), fileID)
|
||||
if err != nil {
|
||||
return CacheEntryValue{}, xerrors.Errorf("failed to read file from database: %w", err)
|
||||
}
|
||||
|
||||
content := bytes.NewBuffer(file.Data)
|
||||
return CacheEntryValue{
|
||||
Object: file.RBACObject(),
|
||||
FS: archivefs.FromTarReader(content),
|
||||
Size: int64(len(file.Data)),
|
||||
}, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user