tempo/pkg/tracesizes/traceSizes.go
Joe Elliott dc97da1252 Ingester/Generator Live trace cleanup (#4365)
* moved trace sizes somewhere shareable

Signed-off-by: Joe Elliott <number101010@gmail.com>

* use tracesizes in ingester

Signed-off-by: Joe Elliott <number101010@gmail.com>

* make tests work

Signed-off-by: Joe Elliott <number101010@gmail.com>

* trace bytes in generator

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove traceCount

Signed-off-by: Joe Elliott <number101010@gmail.com>

* live trace shenanigans

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Update modules/generator/processor/localblocks/livetraces.go

Co-authored-by: Mario <mariorvinas@gmail.com>

* Update modules/ingester/instance.go

Co-authored-by: Mario <mariorvinas@gmail.com>

* Test cleanup. Add sz test, restore commented out and fix e2e

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove todo comment

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
Co-authored-by: Mario <mariorvinas@gmail.com>
2024-11-22 12:26:33 -05:00

66 lines
1.2 KiB
Go

package tracesizes
import (
"hash"
"hash/fnv"
"sync"
"time"
)
type Tracker struct {
mtx sync.Mutex
hash hash.Hash64
sizes map[uint64]*traceSize
}
type traceSize struct {
size int
timestamp time.Time
}
func New() *Tracker {
return &Tracker{
hash: fnv.New64(),
sizes: make(map[uint64]*traceSize),
}
}
func (s *Tracker) token(traceID []byte) uint64 {
s.hash.Reset()
s.hash.Write(traceID)
return s.hash.Sum64()
}
// Allow returns true if the historical total plus incoming size is less than
// or equal to the max. The historical total is kept alive and incremented even
// if not allowed, so that long-running traces are cutoff as expected.
func (s *Tracker) Allow(traceID []byte, sz, max int) bool {
s.mtx.Lock()
defer s.mtx.Unlock()
token := s.token(traceID)
tr := s.sizes[token]
if tr == nil {
tr = &traceSize{
size: 0, // size added below
}
s.sizes[token] = tr
}
tr.timestamp = time.Now()
tr.size += sz
return tr.size <= max
}
func (s *Tracker) ClearIdle(idleSince time.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()
for token, tr := range s.sizes {
if tr.timestamp.Before(idleSince) {
delete(s.sizes, token)
}
}
}