feat: Traceql metric sum_over_time (#4786)

* added sum_over_time to language

* rename evaluator

* refactor overtime functions

* added documentation

* fix compensation

* fix test due to floating point discrepancy

* changelog

* fix expr.y

* linting

* Update docs/sources/tempo/traceql/metrics-queries/_index.md

Co-authored-by: Ruslan Mikhailov <195758209+ruslan-mikhailov@users.noreply.github.com>

* Update docs/sources/tempo/traceql/metrics-queries/functions.md

Co-authored-by: Martin Disibio <mdisibio@gmail.com>

* add function

---------

Co-authored-by: Ruslan Mikhailov <195758209+ruslan-mikhailov@users.noreply.github.com>
Co-authored-by: Martin Disibio <mdisibio@gmail.com>
This commit is contained in:
Javi
2025-03-05 10:01:54 +01:00
committed by GitHub
parent 5d540e119f
commit d71a556083
17 changed files with 773 additions and 491 deletions

View File

@ -24,6 +24,7 @@
configurable via the throughput_bytes_slo field, and it will populate op="traces" label in slo and throughput metrics.
* [FEATURE] Added most_recent=true query hint to TraceQL to return most recent results. [#4238](https://github.com/grafana/tempo/pull/4238) (@joe-elliott)
* [FEATURE] Add ability to add artificial delay to push requests [#4716](https://github.com/grafana/tempo/pull/4716) (@yvrhdn)
* [FEATURE] TraceQL metrics: sum_over_time [#4786](https://github.com/grafana/tempo/pull/4786) (@javiermolinar)
* [ENHANCEMENT] Improve Tempo build options [#4755](https://github.com/grafana/tempo/pull/4755) (@stoewer)
* [ENHANCEMENT] Rewrite traces using rebatching [#4690](https://github.com/grafana/tempo/pull/4690) (@stoewer @joe-elliott)
* [ENHANCEMENT] Reorder span iterators [#4754](https://github.com/grafana/tempo/pull/4754) (@stoewer)

View File

@ -71,7 +71,7 @@ Refer to [Solve problems using metrics queries](ref:solve-problems-mq) for some
### Functions
TraceQL metrics queries currently include the following functions for aggregating over groups of spans: `rate`, `count_over_time`, `max_over_time`, `min_over_time`, `avg_over_time`, `quantile_over_time`, `histogram_over_time`, and `compare`.
TraceQL metrics queries currently include the following functions for aggregating over groups of spans: `rate`, `count_over_time`, `sum_over_time`, `max_over_time`, `min_over_time`, `avg_over_time`, `quantile_over_time`, `histogram_over_time`, and `compare`.
These functions can be added as an operator at the end of any TraceQL query.
For detailed information and example queries for each function, refer to [TraceQL metrics functions](ref:mq-functions).

View File

@ -12,7 +12,7 @@ keywords:
<!-- If you add a new function to this page, make sure you also add it to the _index.md#functions section.-->
TraceQL supports `rate`, `count_over_time`, `min_over_time`, `avg_over_time`, `quantile_over_time`, `histogram_over_time`, and `compare` functions.
TraceQL supports `rate`, `count_over_time`, `sum_over_time`, `min_over_time`, `avg_over_time`, `quantile_over_time`, `histogram_over_time`, and `compare` functions.
## Available functions
@ -24,6 +24,9 @@ These functions can be added as an operator at the end of any TraceQL query.
`count_over_time`
: Counts the number of matching spans per time interval (refer to the [`step` API parameter](https://grafana.com/docs/tempo/<TEMPO_VERSION>/api_docs)).
`sum_over_time`
: Sums the value for the specified attribute across all matching spans per time interval (refer to the [`step` API parameter](https://grafana.com/docs/tempo/<TEMPO_VERSION>/api_docs)).
`min_over_time`
: Returns the minimum value for the specified attribute across all matching spans per time interval (refer to the [`step` API parameter](https://grafana.com/docs/tempo/<TEMPO_VERSION>/api_docs/#traceql-metrics)).
@ -96,13 +99,17 @@ This example counts the number of spans with name `"GET /:endpoint"` broken down
```
## The `min_over_time`, `max_over_time`, and `avg_over_time` functions
## The `sum_over_time`, `min_over_time`, `max_over_time`, and `avg_over_time` functions
The `sum_over_time()` lets you aggregate numerical values by computing the sum value of them.
The time interval that the sum is computed over is set by the `step` parameter.
The `min_over_time()` function lets you aggregate numerical attributes by calculating their minimum value.
For example, you could choose to calculate the minimum duration of a group of spans, or you could choose to calculate the minimum value of a custom attribute you've attached to your spans, like `span.shopping.cart.entries`.
The time interval that the minimum is computed over is set by the `step` parameter.
The `max_over_time()` let you aggregate numerical values by computing the maximum value of them, such as the all important span duration.
The `max_over_time()` lets you aggregate numerical values by computing the maximum value of them, such as the all important span duration.
The time interval that the maximum is computed over is set by the `step` parameter.
The `avg_over_time()` function lets you aggregate numerical values by computing the maximum value of them, such as the all important span duration.

View File

@ -22,7 +22,7 @@ import (
)
// QueryRange returns metrics.
func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest, rawEval *traceql.MetricsEvalulator, jobEval *traceql.MetricsFrontendEvaluator) error {
func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest, rawEval *traceql.MetricsEvaluator, jobEval *traceql.MetricsFrontendEvaluator) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -123,7 +123,7 @@ func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeReque
return nil
}
func (p *Processor) queryRangeWALBlock(ctx context.Context, b common.WALBlock, eval *traceql.MetricsEvalulator) error {
func (p *Processor) queryRangeWALBlock(ctx context.Context, b common.WALBlock, eval *traceql.MetricsEvaluator) error {
m := b.BlockMeta()
ctx, span := tracer.Start(ctx, "Processor.QueryRange.WALBlock", trace.WithAttributes(
attribute.String("block", m.BlockID.String()),

View File

@ -1184,13 +1184,18 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
a.exemplarFn = exemplarNaN
case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, minAggregation) }
a.simpleAggregationOp = minAggregation
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, minOverTimeAggregation) }
a.simpleAggregationOp = minOverTimeAggregation
a.exemplarFn = exemplarFnFor(a.attr)
case metricsAggregateMaxOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, maxAggregation) }
a.simpleAggregationOp = maxAggregation
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, maxOverTimeAggregation) }
a.simpleAggregationOp = maxOverTimeAggregation
a.exemplarFn = exemplarFnFor(a.attr)
case metricsAggregateSumOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, sumOverTimeAggregation) }
a.simpleAggregationOp = sumOverTimeAggregation
a.exemplarFn = exemplarFnFor(a.attr)
case metricsAggregateRate:
@ -1352,6 +1357,7 @@ func (a *MetricsAggregate) validate() error {
case metricsAggregateCountOverTime:
case metricsAggregateMinOverTime:
case metricsAggregateMaxOverTime:
case metricsAggregateSumOverTime:
case metricsAggregateRate:
case metricsAggregateHistogramOverTime:
if len(a.by) >= maxGroupBys {

View File

@ -894,6 +894,11 @@ func (m *mockSpan) WithSpanInt(key string, value int) *mockSpan {
return m
}
func (m *mockSpan) WithSpanFloat(key string, value float64) *mockSpan {
m.attributes[NewScopedAttribute(AttributeScopeSpan, false, key)] = NewStaticFloat(value)
return m
}
func (m *mockSpan) WithAttrBool(key string, value bool) *mockSpan {
m.attributes[NewAttribute(key)] = NewStaticBool(value)
return m

View File

@ -360,7 +360,7 @@ func (c *CountOverTimeAggregator) Sample() float64 {
// calculate the rate when given a multiplier.
type OverTimeAggregator struct {
getSpanAttValue func(s Span) float64
agg func(current, new float64) float64
agg func(current, n float64) float64
val float64
}
@ -368,23 +368,15 @@ var _ VectorAggregator = (*OverTimeAggregator)(nil)
func NewOverTimeAggregator(attr Attribute, op SimpleAggregationOp) *OverTimeAggregator {
var fn func(s Span) float64
var agg func(current, new float64) float64
var agg func(current, n float64) float64
switch op {
case maxAggregation:
agg = func(current, new float64) float64 {
if math.IsNaN(current) || new > current {
return new
}
return current
}
case minAggregation:
agg = func(current, new float64) float64 {
if math.IsNaN(current) || new < current {
return new
}
return current
}
case maxOverTimeAggregation:
agg = maxOverTime()
case minOverTimeAggregation:
agg = minOverTime()
case sumOverTimeAggregation:
agg = sumOverTime()
}
switch attr {
@ -807,7 +799,7 @@ func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest,
// Dedupe spans parameter is an indicator of whether to expect duplicates in the datasource. For
// example if the datasource is replication factor=1 or only a single block then we know there
// aren't duplicates, and we can make some optimizations.
func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, exemplars int, timeOverlapCutoff float64, allowUnsafeQueryHints bool) (*MetricsEvalulator, error) {
func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, exemplars int, timeOverlapCutoff float64, allowUnsafeQueryHints bool) (*MetricsEvaluator, error) {
if req.Start <= 0 {
return nil, fmt.Errorf("start required")
}
@ -837,7 +829,7 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, exempl
// This initializes all step buffers, counters, etc
metricsPipeline.init(req, AggregateModeRaw)
me := &MetricsEvalulator{
me := &MetricsEvaluator{
storageReq: storageReq,
metricsPipeline: metricsPipeline,
timeOverlapCutoff: timeOverlapCutoff,
@ -965,7 +957,7 @@ func lookup(needles []Attribute, haystack Span) Static {
return NewStaticNil()
}
type MetricsEvalulator struct {
type MetricsEvaluator struct {
start, end uint64
checkTime bool
maxExemplars, exemplarCount int
@ -990,7 +982,7 @@ func timeRangeOverlap(reqStart, reqEnd, dataStart, dataEnd uint64) float64 {
// Do metrics on the given source of data and merge the results into the working set. Optionally, if provided,
// uses the known time range of the data for last-minute optimizations. Time range is unix nanos
func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher, fetcherStart, fetcherEnd uint64) error {
func (e *MetricsEvaluator) Do(ctx context.Context, f SpansetFetcher, fetcherStart, fetcherEnd uint64) error {
// Make a copy of the request so we can modify it.
storageReq := *e.storageReq
@ -1064,18 +1056,18 @@ func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher, fetcherSta
return nil
}
func (e *MetricsEvalulator) Metrics() (uint64, uint64, uint64) {
func (e *MetricsEvaluator) Metrics() (uint64, uint64, uint64) {
e.mtx.Lock()
defer e.mtx.Unlock()
return e.bytes, e.spansTotal, e.spansDeduped
}
func (e *MetricsEvalulator) Results() SeriesSet {
func (e *MetricsEvaluator) Results() SeriesSet {
return e.metricsPipeline.result()
}
func (e *MetricsEvalulator) sampleExemplar(id []byte) bool {
func (e *MetricsEvaluator) sampleExemplar(id []byte) bool {
if len(e.exemplarMap) >= e.maxExemplars {
return false
}
@ -1124,8 +1116,9 @@ type SimpleAggregationOp int
const (
sumAggregation SimpleAggregationOp = iota
minAggregation
maxAggregation
minOverTimeAggregation
maxOverTimeAggregation
sumOverTimeAggregation
)
type SimpleAggregator struct {
@ -1142,23 +1135,16 @@ func NewSimpleCombiner(req *tempopb.QueryRangeRequest, op SimpleAggregationOp) *
var initWithNaN bool
var f func(existingValue float64, newValue float64) float64
switch op {
case minAggregation:
case minOverTimeAggregation:
// Simple min aggregator. It calculates the minimum between existing values and a new sample
f = func(existingValue float64, newValue float64) float64 {
if math.IsNaN(existingValue) || newValue < existingValue {
return newValue
}
return existingValue
}
f = minOverTime()
initWithNaN = true
case maxAggregation:
case maxOverTimeAggregation:
// Simple max aggregator. It calculates the maximum between existing values and a new sample
f = func(existingValue float64, newValue float64) float64 {
if math.IsNaN(existingValue) || newValue > existingValue {
return newValue
}
return existingValue
}
f = maxOverTime()
initWithNaN = true
case sumOverTimeAggregation:
f = sumOverTime()
initWithNaN = true
default:
// Simple addition aggregator. It adds existing values with the new sample.

View File

@ -164,30 +164,15 @@ func (a *averageValue) add(inc float64) {
a.compensation = c
}
func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
switch {
case math.IsInf(t, 0):
c = 0
// Using Neumaier improvement, swap if next term larger than sum.
case math.Abs(sum) >= math.Abs(inc):
c += (sum - t) + inc
default:
c += (inc - t) + sum
}
return t, c
}
type averageSeries struct {
values []averageValue
labels Labels
Exemplars []Exemplar
}
func newAverageSeries(len int, lenExemplars int, labels Labels) averageSeries {
func newAverageSeries(l int, lenExemplars int, labels Labels) averageSeries {
s := averageSeries{
values: make([]averageValue, len),
values: make([]averageValue, l),
labels: labels,
Exemplars: make([]Exemplar, 0, lenExemplars),
}

View File

@ -0,0 +1,34 @@
package traceql
import "math"
func sumOverTime() func(curr float64, n float64) (res float64) {
var comp float64 // Kahan compensation
return func(sum, inc float64) (res float64) {
if math.IsNaN(sum) {
return inc
}
y := inc - comp
sum, c := kahanSumInc(y, sum, 0) // Compensation is applied on every step, hence we pass 0 to reset it
comp = c
return sum
}
}
func minOverTime() func(curr float64, n float64) (res float64) {
return func(curr, n float64) (res float64) {
if math.IsNaN(curr) || n < curr {
return n
}
return curr
}
}
func maxOverTime() func(curr float64, n float64) (res float64) {
return func(curr, n float64) (res float64) {
if math.IsNaN(curr) || n > curr {
return n
}
return curr
}
}

View File

@ -3,6 +3,7 @@ package traceql
import (
"fmt"
"math"
"math/rand/v2"
"testing"
"time"
@ -410,7 +411,8 @@ func TestQuantileOverTime(t *testing.T) {
},
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
require.Equal(t, out, result)
}
@ -465,7 +467,8 @@ func TestCountOverTime(t *testing.T) {
},
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
require.Equal(t, out, result)
}
@ -493,7 +496,8 @@ func TestMinOverTimeForDuration(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
@ -534,7 +538,8 @@ func TestMinOverTimeWithNoMatch(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
// Test that empty timeseries are not included
ts := result.ToProto(req)
@ -582,7 +587,8 @@ func TestMinOverTimeForSpanAttribute(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 400).WithDuration(512),
}
result := runTraceQLMetric(t, req, in, in2)
result, err := runTraceQLMetric(req, in, in2)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
@ -636,7 +642,8 @@ func TestAvgOverTimeForDuration(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(300),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
@ -675,7 +682,9 @@ func TestAvgOverTimeForDurationWithoutAggregation(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "bar").WithDuration(300),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
avg := result[`{__name__="avg_over_time"}`]
assert.Equal(t, 100., avg.Values[0]*float64(time.Second))
@ -722,7 +731,8 @@ func TestAvgOverTimeForSpanAttribute(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(512),
}
result := runTraceQLMetric(t, req, in, in2)
result, err := runTraceQLMetric(req, in, in2)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
@ -776,7 +786,8 @@ func TestAvgOverTimeWithNoMatch(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
// Test that empty timeseries are not included
ts := result.ToProto(req)
@ -883,7 +894,8 @@ func TestMaxOverTimeForDuration(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
@ -924,7 +936,8 @@ func TestMaxOverTimeWithNoMatch(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
// Test that empty timeseries are not included
ts := result.ToProto(req)
@ -972,7 +985,8 @@ func TestMaxOverTimeForSpanAttribute(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 400).WithDuration(512),
}
result := runTraceQLMetric(t, req, in, in2)
result, err := runTraceQLMetric(req, in, in2)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
@ -1002,6 +1016,152 @@ func TestMaxOverTimeForSpanAttribute(t *testing.T) {
}
}
func TestSumOverTimeForDuration(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | sum_over_time(duration) by (span.foo)",
}
// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(10),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(20),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(30),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(40),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(50),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(60),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(70),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(80),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(90),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(100),
}
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
// We cannot compare with require.Equal because NaN != NaN
// foo.baz = (NaN, NaN, 0.00000027)
assert.True(t, math.IsNaN(fooBaz.Values[0]))
assert.True(t, math.IsNaN(fooBaz.Values[1]))
assert.Equal(t, (80+90+100)/float64(time.Second), fooBaz.Values[2])
// foo.bar = (0.000000128, 0.000000128, NaN)
assert.InEpsilon(t, (10+20+30)/float64(time.Second), fooBar.Values[0], 1e-9)
assert.InEpsilon(t, (40+50+60+70)/float64(time.Second), fooBar.Values[1], 1e-9)
assert.True(t, math.IsNaN(fooBar.Values[2]))
}
func TestSumOverTimeForSpanAttribute(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | sum_over_time(span.kafka.lag) by (span.foo)",
}
// A variety of spans across times, durations, and series.
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 100).WithDuration(100),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 300).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 200).WithDuration(8),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 400).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 200).WithDuration(512),
}
in2 := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 100).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 300).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 400).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 400).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 400).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("kafka.lag", 400).WithDuration(8),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 300).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("kafka.lag", 400).WithDuration(512),
}
result, err := runTraceQLMetric(req, in, in2)
require.NoError(t, err)
fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
// Alas,we cannot compare with require.Equal because NaN != NaN
// foo.baz = (200, NaN, 1700)
assert.Equal(t, 200.0, fooBaz.Values[0])
assert.True(t, math.IsNaN(fooBaz.Values[1]))
assert.Equal(t, 1700.0, fooBaz.Values[2])
// foo.bar = (1200,2400, NaN)
assert.Equal(t, 1200.0, fooBar.Values[0])
assert.Equal(t, 2400.0, fooBar.Values[1])
assert.True(t, math.IsNaN(fooBar.Values[2]))
// Test that NaN values are not included in the samples after casting to proto
ts := result.ToProto(req)
fooBarSamples := []tempopb.Sample{{TimestampMs: 1000, Value: 1200}, {TimestampMs: 2000, Value: 2400}}
fooBazSamples := []tempopb.Sample{{TimestampMs: 1000, Value: 200}, {TimestampMs: 3000, Value: 1700}}
for _, s := range ts {
if s.PromLabels == "{span.foo=\"bar\"}" {
assert.Equal(t, fooBarSamples, s.Samples)
} else {
assert.Equal(t, fooBazSamples, s.Samples)
}
}
}
func TestSumOverTimeWithNoMatch(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | sum_over_time(span.buu)",
}
// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 404).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(8),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 201).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 401).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
// Test that empty timeseries are not included
ts := result.ToProto(req)
assert.True(t, len(ts) == 0)
}
func TestHistogramOverTime(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
@ -1069,22 +1229,29 @@ func TestHistogramOverTime(t *testing.T) {
},
}
result := runTraceQLMetric(t, req, in)
result, err := runTraceQLMetric(req, in)
require.NoError(t, err)
require.Equal(t, out, result)
}
func runTraceQLMetric(t *testing.T, req *tempopb.QueryRangeRequest, inSpans ...[]Span) SeriesSet {
func runTraceQLMetric(req *tempopb.QueryRangeRequest, inSpans ...[]Span) (SeriesSet, error) {
e := NewEngine()
layer2, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeSum)
require.NoError(t, err)
if err != nil {
return nil, err
}
layer3, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeFinal)
require.NoError(t, err)
if err != nil {
return nil, err
}
for _, spanSet := range inSpans {
layer1, err := e.CompileMetricsQueryRange(req, 0, 0, false)
require.NoError(t, err)
if err != nil {
return nil, err
}
for _, s := range spanSet {
layer1.metricsPipeline.observe(s)
}
@ -1100,5 +1267,40 @@ func runTraceQLMetric(t *testing.T, req *tempopb.QueryRangeRequest, inSpans ...[
layer3.ObserveSeries(res.ToProto(req))
// Layer 3 final results
return layer3.Results()
return layer3.Results(), nil
}
func randInt(minimum, maximum int) int {
return rand.IntN(maximum-minimum) + minimum
}
func randFloat(minimum, maximum float64) float64 {
return rand.Float64()*(maximum-minimum) + minimum
}
func BenchmarkSumOverTime(b *testing.B) {
totalSpans := 1_000_000
in := make([]Span, 0, totalSpans)
in2 := make([]Span, 0, totalSpans)
minimum := 1e10 // 10 billion
maximun := 1e20 // 100 quintillion
for range totalSpans {
s := time.Duration(randInt(1, 3)) * time.Second
v := randFloat(minimum, maximun)
in = append(in2, newMockSpan(nil).WithStartTime(uint64(s)).WithSpanString("foo", "bar").WithSpanFloat("kafka.lag", v).WithDuration(100))
s = time.Duration(randInt(1, 3)) * time.Second
v = randFloat(minimum, maximun)
in2 = append(in2, newMockSpan(nil).WithStartTime(uint64(s)).WithSpanString("foo", "bar").WithSpanFloat("kafka.lag", v).WithDuration(100))
}
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | sum_over_time(span.kafka.lag) by (span.foo)",
}
for b.Loop() {
_, _ = runTraceQLMetric(req, in, in2)
}
}

View File

@ -57,6 +57,7 @@ const (
metricsAggregateMinOverTime
metricsAggregateMaxOverTime
metricsAggregateAvgOverTime
metricsAggregateSumOverTime
metricsAggregateQuantileOverTime
metricsAggregateHistogramOverTime
)
@ -73,6 +74,8 @@ func (a MetricsAggregateOp) String() string {
return "max_over_time"
case metricsAggregateAvgOverTime:
return "avg_over_time"
case metricsAggregateSumOverTime:
return "sum_over_time"
case metricsAggregateQuantileOverTime:
return "quantile_over_time"
case metricsAggregateHistogramOverTime:

View File

@ -100,7 +100,7 @@ import (
COUNT AVG MAX MIN SUM
BY COALESCE SELECT
END_ATTRIBUTE
RATE COUNT_OVER_TIME MIN_OVER_TIME MAX_OVER_TIME AVG_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME COMPARE
RATE COUNT_OVER_TIME MIN_OVER_TIME MAX_OVER_TIME AVG_OVER_TIME SUM_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME COMPARE
WITH
// Operators are listed with increasing precedence.
@ -302,6 +302,8 @@ metricsAggregation:
| MIN_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMinOverTime, $3, $7) }
| MAX_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMaxOverTime, $3, nil) }
| MAX_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMaxOverTime, $3, $7) }
| SUM_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateSumOverTime, $3, nil) }
| SUM_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateSumOverTime, $3, $7) }
| AVG_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newAverageOverTimeMetricsAggregator($3, nil) }
| AVG_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newAverageOverTimeMetricsAggregator($3, $7) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, nil) }

File diff suppressed because it is too large Load Diff

View File

@ -106,6 +106,7 @@ var tokens = map[string]int{
"min_over_time": MIN_OVER_TIME,
"max_over_time": MAX_OVER_TIME,
"avg_over_time": AVG_OVER_TIME,
"sum_over_time": SUM_OVER_TIME,
"quantile_over_time": QUANTILE_OVER_TIME,
"histogram_over_time": HISTOGRAM_OVER_TIME,
"compare": COMPARE,

View File

@ -1406,6 +1406,18 @@ func TestMetrics(t *testing.T) {
}),
),
},
{
in: `{ } | sum_over_time(duration) by(name, span.http.status_code)`,
expected: newRootExprWithMetrics(
newPipeline(newSpansetFilter(NewStaticBool(true))),
newMetricsAggregateWithAttr(metricsAggregateSumOverTime,
NewIntrinsic(IntrinsicDuration),
[]Attribute{
NewIntrinsic(IntrinsicName),
NewScopedAttribute(AttributeScopeSpan, false, "http.status_code"),
}),
),
},
{
in: `{ } | quantile_over_time(duration, 0, 0.90, 0.95, 1) by(name, span.http.status_code)`,
expected: newRootExprWithMetrics(

View File

@ -149,6 +149,7 @@ valid:
- '{} | min_over_time(duration) by (span.http.path)'
- '{} | max_over_time(duration) by (span.http.path)'
- '{} | avg_over_time(duration) by (span.http.path)'
- '{} | sum_over_time(duration) by (span.http.path)'
- '{} | quantile_over_time(duration, 0, 0.9, 1) by (span.http.path)'
# undocumented - nested set
- '{ nestedSetLeft > 3 }'

View File

@ -1,6 +1,7 @@
package traceql
import (
"math"
"time"
"github.com/grafana/tempo/pkg/tempopb"
@ -141,3 +142,18 @@ func (b *branchOptimizer) OptimalBranch() int {
}
return mini
}
func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
switch {
case math.IsInf(t, 0):
c = 0
// Using Neumaier improvement, swap if next term larger than sum.
case math.Abs(sum) >= math.Abs(inc):
c += (sum - t) + inc
default:
c += (inc - t) + sum
}
return t, c
}