tempo/tempodb/tempodb_metrics_test.go
Javi 4ac07153bd rhythm: set ingestion slack for partition consumption (#4611)
* rhythm: set ingestion slack for partition consumption

* remove test

* Update WalBlock interface to explicitly compute the ingestionSlack

* add cycle duration to slack ingestion range calculation

* distinguish between ingesters and blockbuilder for dataquality warnings
2025-01-29 15:18:49 +01:00

197 lines
5.7 KiB
Go

package tempodb
import (
"context"
"path"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
resource_v1 "github.com/grafana/tempo/pkg/tempopb/resource/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)
var queryRangeTestCases = []struct {
req *tempopb.QueryRangeRequest
expected []*tempopb.TimeSeries
}{
{
req: &tempopb.QueryRangeRequest{
Start: 1,
End: 50 * uint64(time.Second),
Step: 15 * uint64(time.Second),
Query: "{ } | rate()",
},
expected: []*tempopb.TimeSeries{
{
PromLabels: `{__name__="rate"}`,
Labels: []common_v1.KeyValue{tempopb.MakeKeyValueString("__name__", "rate")},
Samples: []tempopb.Sample{
{TimestampMs: 0, Value: 14.0 / 15.0}, // First interval starts at 1, so it only has 14 spans
{TimestampMs: 15_000, Value: 1.0}, // Spans every 1 second
{TimestampMs: 30_000, Value: 1.0}, // Spans every 1 second
{TimestampMs: 45_000, Value: 5.0 / 15.0}, // Interval [45,50) has 5 spans
{TimestampMs: 60_000, Value: 0}, // I think this is a bug that we extend out an extra interval
},
},
},
},
{
req: &tempopb.QueryRangeRequest{
Start: 1,
End: 50 * uint64(time.Second),
Step: 15 * uint64(time.Second),
Query: `{ .service.name="even" } | rate()`,
},
expected: []*tempopb.TimeSeries{
{
PromLabels: `{__name__="rate"}`,
Labels: []common_v1.KeyValue{tempopb.MakeKeyValueString("__name__", "rate")},
Samples: []tempopb.Sample{
{TimestampMs: 0, Value: 7.0 / 15.0}, // Interval [ 1, 14], 7 spans at 2, 4, 6, 8, 10, 12, 14
{TimestampMs: 15_000, Value: 7.0 / 15.0}, // Interval [15, 29], 7 spans at 16, 18, 20, 22, 24, 26, 28
{TimestampMs: 30_000, Value: 8.0 / 15.0}, // Interval [30, 44], 8 spans at 30, 32, 34, 36, 38, 40, 42, 44
{TimestampMs: 45_000, Value: 2.0 / 15.0}, // Interval [45, 50), 2 spans at 46, 48
{TimestampMs: 60_000, Value: 0}, // I think this is a bug that we extend out an extra interval
},
},
},
},
}
func TestTempoDBQueryRange(t *testing.T) {
var (
tempDir = t.TempDir()
blockVersion = vparquet4.VersionString
)
dc := backend.DedicatedColumns{
{Scope: "resource", Name: "res-dedicated.01", Type: "string"},
{Scope: "resource", Name: "res-dedicated.02", Type: "string"},
{Scope: "span", Name: "span-dedicated.01", Type: "string"},
{Scope: "span", Name: "span-dedicated.02", Type: "string"},
}
r, w, c, err := New(&Config{
Backend: backend.Local,
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
Block: &common.BlockConfig{
IndexDownsampleBytes: 17,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Version: blockVersion,
IndexPageSizeBytes: 1000,
RowGroupSizeBytes: 10000,
DedicatedColumns: dc,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IngestionSlack: time.Since(time.Time{}),
},
Search: &SearchConfig{
ChunkSizeBytes: 1_000_000,
ReadBufferCount: 8, ReadBufferSizeBytes: 4 * 1024 * 1024,
},
BlocklistPoll: 0,
}, nil, log.NewNopLogger())
require.NoError(t, err)
err = c.EnableCompaction(context.Background(), &CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{}, &mockOverrides{})
require.NoError(t, err)
ctx := context.Background()
r.EnablePolling(ctx, &mockJobSharder{})
// Write to wal
wal := w.WAL()
meta := &backend.BlockMeta{BlockID: backend.NewUUID(), TenantID: testTenantID, DedicatedColumns: dc}
head, err := wal.NewBlock(meta, model.CurrentEncoding)
require.NoError(t, err)
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)
totalSpans := 100
for i := 1; i <= totalSpans; i++ {
tid := test.ValidTraceID(nil)
sp := test.MakeSpan(tid)
// Start time is i seconds
sp.StartTimeUnixNano = uint64(i * int(time.Second))
// Duration is i seconds
sp.EndTimeUnixNano = sp.StartTimeUnixNano + uint64(i*int(time.Second))
// Service name
var svcName string
if i%2 == 0 {
svcName = "even"
} else {
svcName = "odd"
}
tr := &tempopb.Trace{
ResourceSpans: []*v1.ResourceSpans{
{
Resource: &resource_v1.Resource{
Attributes: []*common_v1.KeyValue{tempopb.MakeKeyValueStringPtr("service.name", svcName)},
},
ScopeSpans: []*v1.ScopeSpans{
{
Spans: []*v1.Span{
sp,
},
},
},
},
},
}
b1, err := dec.PrepareForWrite(tr, 0, 0)
require.NoError(t, err)
b2, err := dec.ToObject([][]byte{b1})
require.NoError(t, err)
err = head.Append(tid, b2, 0, 0, true)
require.NoError(t, err)
}
// Complete block
block, err := w.CompleteBlock(context.Background(), head)
require.NoError(t, err)
f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return block.Fetch(ctx, req, common.DefaultSearchOptions())
})
for _, tc := range queryRangeTestCases {
eval, err := traceql.NewEngine().CompileMetricsQueryRange(tc.req, 0, 0, false)
require.NoError(t, err)
err = eval.Do(ctx, f, 0, 0)
require.NoError(t, err)
actual := eval.Results().ToProto(tc.req)
require.Equal(t, tc.expected, actual, "Query: %v", tc.req.Query)
}
}