mirror of
https://github.com/grafana/tempo.git
synced 2025-03-14 03:06:42 +00:00
* first attempt - abandoned Signed-off-by: Joe Elliott <number101010@gmail.com> * Revert "first attempt - abandoned" This reverts commit d29f5c05df903651601968504fb79c5bfe053c9f. * context cancel approach Signed-off-by: Joe Elliott <number101010@gmail.com> * bail out of doForAtLeast if contxt is cancelled Signed-off-by: Joe Elliott <number101010@gmail.com> * old man shakes his fist at the linter Signed-off-by: Joe Elliott <number101010@gmail.com> * changelog Signed-off-by: Joe Elliott <number101010@gmail.com> * review Signed-off-by: Joe Elliott <number101010@gmail.com> * add ret Signed-off-by: Joe Elliott <number101010@gmail.com> --------- Signed-off-by: Joe Elliott <number101010@gmail.com>
114 lines
3.4 KiB
Go
114 lines
3.4 KiB
Go
package tempodb
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/go-kit/log/level"
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/grafana/tempo/pkg/boundedwaitgroup"
|
|
"github.com/grafana/tempo/tempodb/backend"
|
|
)
|
|
|
|
// retentionLoop watches a timer to clean up blocks that are past retention.
|
|
// todo: correctly pass context all the way to the backend so a cancelled context can stop the retention loop.
|
|
// see implementation of compactionLoop()
|
|
func (rw *readerWriter) retentionLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(rw.cfg.BlocklistPoll)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
rw.doRetention(ctx)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rw *readerWriter) doRetention(ctx context.Context) {
|
|
tenants := rw.blocklist.Tenants()
|
|
|
|
bg := boundedwaitgroup.New(rw.compactorCfg.RetentionConcurrency)
|
|
|
|
for _, tenantID := range tenants {
|
|
bg.Add(1)
|
|
go func(t string) {
|
|
defer bg.Done()
|
|
rw.retainTenant(ctx, t)
|
|
}(tenantID)
|
|
}
|
|
|
|
bg.Wait()
|
|
}
|
|
|
|
func (rw *readerWriter) retainTenant(ctx context.Context, tenantID string) {
|
|
start := time.Now()
|
|
defer func() { metricRetentionDuration.Observe(time.Since(start).Seconds()) }()
|
|
|
|
// Check for overrides
|
|
retention := rw.compactorCfg.BlockRetention // Default
|
|
if r := rw.compactorOverrides.BlockRetentionForTenant(tenantID); r != 0 {
|
|
retention = r
|
|
}
|
|
level.Debug(rw.logger).Log("msg", "Performing block retention", "tenantID", tenantID, "retention", retention)
|
|
|
|
// iterate through block list. make compacted anything that is past retention.
|
|
cutoff := time.Now().Add(-retention)
|
|
blocklist := rw.blocklist.Metas(tenantID)
|
|
for _, b := range blocklist {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
if b.EndTime.Before(cutoff) && rw.compactorSharder.Owns(b.BlockID.String()) {
|
|
level.Info(rw.logger).Log("msg", "marking block for deletion", "blockID", b.BlockID, "tenantID", tenantID)
|
|
err := rw.c.MarkBlockCompacted((uuid.UUID)(b.BlockID), tenantID)
|
|
if err != nil {
|
|
level.Error(rw.logger).Log("msg", "failed to mark block compacted during retention", "blockID", b.BlockID, "tenantID", tenantID, "err", err)
|
|
metricRetentionErrors.Inc()
|
|
} else {
|
|
metricMarkedForDeletion.Inc()
|
|
|
|
rw.blocklist.Update(tenantID, nil, []*backend.BlockMeta{b}, []*backend.CompactedBlockMeta{
|
|
{
|
|
BlockMeta: *b,
|
|
CompactedTime: time.Now(),
|
|
},
|
|
}, nil)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// iterate through compacted list looking for blocks ready to be cleared
|
|
cutoff = time.Now().Add(-rw.compactorCfg.CompactedBlockRetention)
|
|
compactedBlocklist := rw.blocklist.CompactedMetas(tenantID)
|
|
for _, b := range compactedBlocklist {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
level.Debug(rw.logger).Log("owns", rw.compactorSharder.Owns(b.BlockID.String()), "blockID", b.BlockID, "tenantID", tenantID)
|
|
if b.CompactedTime.Before(cutoff) && rw.compactorSharder.Owns(b.BlockID.String()) {
|
|
level.Info(rw.logger).Log("msg", "deleting block", "blockID", b.BlockID, "tenantID", tenantID)
|
|
err := rw.c.ClearBlock((uuid.UUID)(b.BlockID), tenantID)
|
|
if err != nil {
|
|
level.Error(rw.logger).Log("msg", "failed to clear compacted block during retention", "blockID", b.BlockID, "tenantID", tenantID, "err", err)
|
|
metricRetentionErrors.Inc()
|
|
} else {
|
|
metricDeleted.Inc()
|
|
|
|
rw.blocklist.Update(tenantID, nil, nil, nil, []*backend.CompactedBlockMeta{b})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|