Add TraceQL query hint to retrieve most recent results ordered by trace start time (#4238)

* Added ordered results

Signed-off-by: Joe Elliott <number101010@gmail.com>

* add most_recent query hint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog, docs and lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* e2e tests - fixed tag search

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove clone changes

Signed-off-by: Joe Elliott <number101010@gmail.com>

* review

Signed-off-by: Joe Elliott <number101010@gmail.com>

* make shards configurable

Signed-off-by: Joe Elliott <number101010@gmail.com>

* dont mess with me lint. i will uninstall you

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Make all endpoints search backwards in time

Signed-off-by: Joe Elliott <number101010@gmail.com>

* nice work on this one carles

Signed-off-by: Joe Elliott <number101010@gmail.com>

* consolidate block meta functions

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fix merge :P

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* don't bother tracking normal searches

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
This commit is contained in:
Joe Elliott
2025-02-12 16:33:10 -05:00
committed by GitHub
parent f8283c5e38
commit 6c07024064
35 changed files with 1680 additions and 797 deletions

View File

@ -17,6 +17,7 @@
* [CHANGE] **BREAKING CHANGE** Enforce max attribute size at event, link, and instrumentation scope. Make config per-tenant.
Renamed max_span_attr_byte to max_attribute_bytes
[#4633](https://github.com/grafana/tempo/pull/4633) (@ie-pham)
* [FEATURE] Added most_recent=true query hint to TraceQL to return most recent results. [#4238](https://github.com/grafana/tempo/pull/4238) (@joe-elliott)
* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar)
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)

View File

@ -697,11 +697,15 @@ query_frontend:
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]
# The number of time windows to break a search up into when doing a most recent TraceQL search. This only impacts TraceQL
# searches with (most_recent=true)
[most_recent_shards: <int> | default = 200]
# The number of shards to break ingester queries into.
[ingester_shards]: <int> | default = 3]
[ingester_shards: <int> | default = 3]
# The maximum allowed value of spans per span set. 0 disables this limit.
[max_spans_per_span_set]: <int> | default = 100]
[max_spans_per_span_set: <int> | default = 100]
# SLO configuration for Metadata (tags and tag values) endpoints.
metadata_slo:

View File

@ -322,6 +322,7 @@ query_frontend:
query_backend_after: 15m0s
query_ingesters_until: 30m0s
ingester_shards: 3
most_recent_shards: 200
max_spans_per_span_set: 100
trace_by_id:
query_shards: 50

View File

@ -598,6 +598,15 @@ TraceQL can select arbitrary fields from spans. This is particularly performant
{ status=error } | select(span.http.status_code, span.http.url)
```
## Retrieving most recent results (experimental)
The TraceQL query hint `most_recent=true` can be used with any TraceQL selection query to force Tempo to return the most recent results ordered by time. Examples:
```
{} with (most_recent=true)
{ span.foo = "bar" } >> { status = error } with (most_recent=true)
```
## Experimental TraceQL metrics
TraceQL metrics are experimental, but easy to get started with. Refer to [the TraceQL metrics]({{< relref "../operations/traceql-metrics.md" >}}) documentation for more information.

View File

@ -24,6 +24,8 @@ type TResponse interface {
type PipelineResponse interface {
HTTPResponse() *http.Response
RequestData() any
IsMetadata() bool // todo: search and query range pass back metadata responses through a normal http response. update to use this instead.
}
type genericCombiner[T TResponse] struct {
@ -33,6 +35,7 @@ type genericCombiner[T TResponse] struct {
new func() T
combine func(partial T, final T, resp PipelineResponse) error
metadata func(resp PipelineResponse, final T) error
finalize func(T) (T, error)
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
quit func(T) bool
@ -52,6 +55,16 @@ func initHTTPCombiner[T TResponse](c *genericCombiner[T], marshalingFormat strin
// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
if r.IsMetadata() && c.metadata != nil {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.metadata(r, c.current); err != nil {
return fmt.Errorf("error processing metadata: %w", err)
}
return nil
}
res := r.HTTPResponse()
if res == nil {
return nil

View File

@ -210,7 +210,8 @@ func TestGenericCombinerDoesntRace(t *testing.T) {
}
type testPipelineResponse struct {
r *http.Response
r *http.Response
responseData any
}
func newTestResponse(t *testing.T) *testPipelineResponse {
@ -242,7 +243,11 @@ func (p *testPipelineResponse) HTTPResponse() *http.Response {
}
func (p *testPipelineResponse) RequestData() any {
return nil
return p.responseData
}
func (p *testPipelineResponse) IsMetadata() bool {
return false
}
func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] {

View File

@ -1,7 +1,7 @@
package combiner
import (
"sort"
"net/http"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/search"
@ -9,45 +9,73 @@ import (
"github.com/grafana/tempo/pkg/traceql"
)
var _ PipelineResponse = (*SearchJobResponse)(nil)
type SearchShards struct {
TotalJobs uint32
CompletedThroughSeconds uint32
}
type SearchJobResponse struct {
TotalBlocks int
TotalJobs int
TotalBytes uint64
Shards []SearchShards
}
func (s *SearchJobResponse) HTTPResponse() *http.Response {
return nil
}
func (s *SearchJobResponse) RequestData() any {
return nil
}
func (s *SearchJobResponse) IsMetadata() bool {
return true
}
var _ GRPCCombiner[*tempopb.SearchResponse] = (*genericCombiner[*tempopb.SearchResponse])(nil)
// NewSearch returns a search combiner
func NewSearch(limit int) Combiner {
metadataCombiner := traceql.NewMetadataCombiner()
func NewSearch(limit int, keepMostRecent bool) Combiner {
metadataCombiner := traceql.NewMetadataCombiner(limit, keepMostRecent)
diffTraces := map[string]struct{}{}
completedThroughTracker := &ShardCompletionTracker{}
c := &genericCombiner[*tempopb.SearchResponse]{
httpStatusCode: 200,
new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} },
current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, _ PipelineResponse) error {
for _, t := range partial.Traces {
// if we've reached the limit and this is NOT a new trace then skip it
if limit > 0 &&
metadataCombiner.Count() >= limit &&
!metadataCombiner.Exists(t.TraceID) {
continue
}
combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, resp PipelineResponse) error {
requestIdx, ok := resp.RequestData().(int)
if ok && keepMostRecent {
completedThroughTracker.addShardIdx(requestIdx)
}
metadataCombiner.AddMetadata(t)
// record modified traces
diffTraces[t.TraceID] = struct{}{}
for _, t := range partial.Traces {
if metadataCombiner.AddMetadata(t) {
// record modified traces
diffTraces[t.TraceID] = struct{}{}
}
}
if partial.Metrics != nil {
// there is a coordination with the search sharder here. normal responses
// will never have total jobs set, but they will have valid Inspected* values
// a special response is sent back from the sharder with no traces but valid Total* values
// if TotalJobs is nonzero then assume its the special response
if partial.Metrics.TotalJobs == 0 {
final.Metrics.CompletedJobs++
final.Metrics.CompletedJobs++
final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes
final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces
}
final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes
final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces
} else {
final.Metrics.TotalBlocks += partial.Metrics.TotalBlocks
final.Metrics.TotalJobs += partial.Metrics.TotalJobs
final.Metrics.TotalBlockBytes += partial.Metrics.TotalBlockBytes
return nil
},
metadata: func(resp PipelineResponse, final *tempopb.SearchResponse) error {
if sj, ok := resp.(*SearchJobResponse); ok && sj != nil {
final.Metrics.TotalBlocks += uint32(sj.TotalBlocks)
final.Metrics.TotalJobs += uint32(sj.TotalJobs)
final.Metrics.TotalBlockBytes += sj.TotalBytes
if keepMostRecent {
completedThroughTracker.addShards(sj.Shards)
}
}
@ -67,34 +95,48 @@ func NewSearch(limit int) Combiner {
Metrics: current.Metrics,
}
for _, tr := range metadataCombiner.Metadata() {
metadataFn := metadataCombiner.Metadata
if keepMostRecent {
metadataFn = func() []*tempopb.TraceSearchMetadata {
completedThroughSeconds := completedThroughTracker.completedThroughSeconds
// if all jobs are completed then let's just return everything the combiner has
if current.Metrics.CompletedJobs == current.Metrics.TotalJobs && current.Metrics.TotalJobs > 0 {
completedThroughSeconds = 1
}
// if we've not completed any shards, then return nothing
if completedThroughSeconds == 0 {
return nil
}
return metadataCombiner.MetadataAfter(completedThroughSeconds)
}
}
for _, tr := range metadataFn() {
// if not in the map, skip. we haven't seen an update
if _, ok := diffTraces[tr.TraceID]; !ok {
continue
}
delete(diffTraces, tr.TraceID)
diff.Traces = append(diff.Traces, tr)
}
sort.Slice(diff.Traces, func(i, j int) bool {
return diff.Traces[i].StartTimeUnixNano > diff.Traces[j].StartTimeUnixNano
})
addRootSpanNotReceivedText(diff.Traces)
// wipe out diff traces for the next time
clear(diffTraces)
return diff, nil
},
// search combiner doesn't use current in the way i would have expected. it only tracks metrics through current and uses the results map for the actual traces.
// should we change this?
quit: func(_ *tempopb.SearchResponse) bool {
if limit <= 0 {
return false
completedThroughSeconds := completedThroughTracker.completedThroughSeconds
// have we completed any shards?
if completedThroughSeconds == 0 {
completedThroughSeconds = traceql.TimestampNever
}
return metadataCombiner.Count() >= limit
return metadataCombiner.IsCompleteFor(completedThroughSeconds)
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
@ -109,6 +151,79 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) {
}
}
func NewTypedSearch(limit int) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit).(GRPCCombiner[*tempopb.SearchResponse])
func NewTypedSearch(limit int, keepMostRecent bool) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit, keepMostRecent).(GRPCCombiner[*tempopb.SearchResponse])
}
// ShardCompletionTracker
type ShardCompletionTracker struct {
shards []SearchShards
foundResponses []int
completedThroughSeconds uint32
curShard int
}
func (s *ShardCompletionTracker) addShards(shards []SearchShards) uint32 {
if len(shards) == 0 {
return s.completedThroughSeconds
}
s.shards = shards
// grow foundResponses to match while keeping the existing values
if len(s.shards) > len(s.foundResponses) {
temp := make([]int, len(s.shards))
copy(temp, s.foundResponses)
s.foundResponses = temp
}
s.incrementCurShardIfComplete()
return s.completedThroughSeconds
}
// Add adds a response to the tracker and returns the allowed completedThroughSeconds
func (s *ShardCompletionTracker) addShardIdx(shardIdx int) uint32 {
// we haven't received shards yet
if len(s.shards) == 0 {
// if shardIdx doesn't fit in foundResponses then alloc a new slice and copy foundResponses forward
if shardIdx >= len(s.foundResponses) {
temp := make([]int, shardIdx+1)
copy(temp, s.foundResponses)
s.foundResponses = temp
}
// and record this idx for when we get shards
s.foundResponses[shardIdx]++
return 0
}
//
if shardIdx >= len(s.foundResponses) {
return s.completedThroughSeconds
}
s.foundResponses[shardIdx]++
s.incrementCurShardIfComplete()
return s.completedThroughSeconds
}
// incrementCurShardIfComplete tests to see if the current shard is complete and increments it if so.
// it does this repeatedly until it finds a shard that is not complete.
func (s *ShardCompletionTracker) incrementCurShardIfComplete() {
for {
if s.curShard >= len(s.shards) {
break
}
if s.foundResponses[s.curShard] == int(s.shards[s.curShard].TotalJobs) {
s.completedThroughSeconds = s.shards[s.curShard].CompletedThroughSeconds
s.curShard++
} else {
break
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -43,7 +43,7 @@ func TestTraceByIDShouldQuit(t *testing.T) {
// unparseable body should not quit, but should return an error
c = NewTraceByID(0, api.HeaderAcceptJSON)
err = c.AddResponse(&pipelineResponse{&http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}})
err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}})
require.Error(t, err)
should = c.ShouldQuit()
require.False(t, should)
@ -101,7 +101,7 @@ func toHTTPProtoResponse(t *testing.T, pb proto.Message, statusCode int) Pipelin
require.NoError(t, err)
}
return &pipelineResponse{&http.Response{
return &testPipelineResponse{r: &http.Response{
Body: io.NopCloser(bytes.NewReader(body)),
StatusCode: statusCode,
}}

View File

@ -27,6 +27,10 @@ func (m MockResponse) RequestData() any {
return nil
}
func (m MockResponse) IsMetadata() bool {
return false
}
func TestNewTraceByIdV2ReturnsAPartialTrace(t *testing.T) {
traceResponse := &tempopb.TraceByIDResponse{
Trace: test.MakeTrace(2, []byte{0x01, 0x02}),

View File

@ -81,6 +81,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
MaxDuration: 168 * time.Hour, // 1 week
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
IngesterShards: 3,
MaxSpansPerSpanSet: 100,
},

View File

@ -5,6 +5,7 @@ import (
"io"
"net/http"
"path"
"sort"
"strings"
"github.com/go-kit/log"
@ -21,6 +22,7 @@ import (
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
)
type RoundTripperFunc func(*http.Request) (*http.Response, error)
@ -78,6 +80,10 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
return nil, fmt.Errorf("query backend after should be less than or equal to query ingester until")
}
if cfg.Search.Sharder.MostRecentShards <= 0 {
return nil, fmt.Errorf("most recent shards must be greater than 0")
}
if cfg.Metrics.Sharder.ConcurrentRequests <= 0 {
return nil, fmt.Errorf("frontend metrics concurrent requests should be greater than 0")
}
@ -344,3 +350,23 @@ func multiTenantUnsupportedMiddleware(cfg Config, logger log.Logger) pipeline.As
return pipeline.NewNoopMiddleware()
}
// blockMetasForSearch returns a list of blocks that are relevant to the search query.
// start and end are unix timestamps in seconds. rf is the replication factor of the blocks to return.
func blockMetasForSearch(allBlocks []*backend.BlockMeta, start, end uint32, rf uint32) []*backend.BlockMeta {
blocks := make([]*backend.BlockMeta, 0, len(allBlocks)/50) // divide by 50 for luck
for _, m := range allBlocks {
if m.StartTime.Unix() <= int64(end) &&
m.EndTime.Unix() >= int64(start) &&
m.ReplicationFactor == rf { // This check skips generator blocks (RF=1)
blocks = append(blocks, m)
}
}
// search backwards in time
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].EndTime.After(blocks[j].EndTime)
})
return blocks
}

View File

@ -26,6 +26,7 @@ func TestFrontendTagSearchRequiresOrgID(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -69,12 +70,12 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)")
assert.Nil(t, f)
f, err = New(Config{
@ -86,6 +87,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -102,6 +104,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: 0,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -118,6 +121,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: 0,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -134,6 +138,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
QueryIngestersUntil: time.Minute,
QueryBackendAfter: time.Hour,
},
@ -151,6 +156,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -166,6 +172,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -188,6 +195,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -202,4 +210,28 @@ func TestFrontendBadConfigFails(t *testing.T) {
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend metrics interval should be greater than 0")
assert.Nil(t, f)
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: 0,
},
SLO: testSLOcfg,
},
Metrics: MetricsConfig{
Sharder: QueryRangeSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "most recent shards must be greater than 0")
assert.Nil(t, f)
}

View File

@ -158,22 +158,6 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, jobMetricsResponse, s.next), nil
}
// blockMetas returns all relevant blockMetas given a start/end
func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
// reduce metas to those in the requested range
allMetas := s.reader.BlockMetas(tenantID)
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.UnixNano() <= end &&
m.EndTime.UnixNano() >= start &&
m.ReplicationFactor == 1 { // We always only query RF1 blocks
metas = append(metas, m)
}
}
return metas
}
func (s *queryRangeSharder) exemplarsPerShard(total uint32, exemplars uint32) uint32 {
if exemplars == 0 {
return 0
@ -201,7 +185,9 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
// Blocks within overall time range. This is just for instrumentation, more precise time
// range is checked for each window.
blocks := s.blockMetas(int64(backendReq.Start), int64(backendReq.End), tenantID)
startS := uint32(backendReq.Start / uint64(time.Second))
endS := uint32(backendReq.End / uint64(time.Second))
blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), startS, endS, 1)
if len(blocks) == 0 {
// no need to search backend
close(reqCh)

View File

@ -30,11 +30,22 @@ func (p pipelineResponse) RequestData() any {
return p.requestData
}
func (p pipelineResponse) IsMetadata() bool {
return false
}
// syncResponse is a single http.Response that implements the Responses[*http.Response] interface.
type syncResponse struct {
r combiner.PipelineResponse
}
// NewAsyncResponse creates a new AsyncResponse that wraps a single http.Response.
func NewAsyncResponse(r combiner.PipelineResponse) Responses[combiner.PipelineResponse] {
return syncResponse{
r: r,
}
}
// NewHTTPToAsyncResponse creates a new AsyncResponse that wraps a single http.Response.
func NewHTTPToAsyncResponse(r *http.Response) Responses[combiner.PipelineResponse] {
return syncResponse{

View File

@ -304,7 +304,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
bridge := &pipelineBridge{
next: tc.finalRT(cancel),
}
httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0))
httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0, false))
_, _ = httpCollector.RoundTrip(req)
@ -326,7 +326,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
bridge := &pipelineBridge{
next: tc.finalRT(cancel),
}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil })
_ = grpcCollector.RoundTrip(req)
@ -350,7 +350,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
}
s := sharder{next: sharder{next: bridge}, funcSharder: true}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil })
_ = grpcCollector.RoundTrip(req)
@ -373,7 +373,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
}
s := sharder{next: sharder{next: bridge, funcSharder: true}}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil })
_ = grpcCollector.RoundTrip(req)

View File

@ -20,6 +20,7 @@ import (
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)
// newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler
@ -46,14 +47,14 @@ func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c
tenant, _ := user.ExtractOrgID(ctx)
start := time.Now()
limit, err := adjustLimit(req.Limit, cfg.Search.Sharder.DefaultLimit, cfg.Search.Sharder.MaxLimit)
comb, err := newCombiner(req, cfg.Search.Sharder)
if err != nil {
level.Error(logger).Log("msg", "search streaming: adjust limit failed", "err", err)
return status.Errorf(codes.InvalidArgument, "adjust limit: %s", err.Error())
level.Error(logger).Log("msg", "search streaming: could not create combiner", "err", err)
return status.Error(codes.InvalidArgument, err.Error())
}
var finalResponse *tempopb.SearchResponse
comb := combiner.NewTypedSearch(int(limit))
collector := pipeline.NewGRPCCollector[*tempopb.SearchResponse](next, cfg.ResponseConsumers, comb, func(sr *tempopb.SearchResponse) error {
finalResponse = sr // sadly we can't srv.Send directly into the collector. we need bytesProcessed for the SLO calculations
return srv.Send(sr)
@ -92,10 +93,9 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P
}, nil
}
// build combiner with limit
limit, err := adjustLimit(searchReq.Limit, cfg.Search.Sharder.DefaultLimit, cfg.Search.Sharder.MaxLimit)
comb, err := newCombiner(searchReq, cfg.Search.Sharder)
if err != nil {
level.Error(logger).Log("msg", "search: adjust limit failed", "err", err)
level.Error(logger).Log("msg", "search: could not create combiner", "err", err)
return &http.Response{
StatusCode: http.StatusBadRequest,
Status: http.StatusText(http.StatusBadRequest),
@ -106,7 +106,6 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P
logRequest(logger, tenant, searchReq)
// build and use roundtripper
comb := combiner.NewTypedSearch(int(limit))
rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb)
resp, err := rt.RoundTrip(req)
@ -125,6 +124,28 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P
})
}
func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig) (combiner.GRPCCombiner[*tempopb.SearchResponse], error) {
limit, err := adjustLimit(req.Limit, cfg.DefaultLimit, cfg.MaxLimit)
if err != nil {
return nil, err
}
mostRecent := false
if len(req.Query) > 0 {
query, err := traceql.Parse(req.Query)
if err != nil {
return nil, fmt.Errorf("invalid TraceQL query: %s", err)
}
ok := false
if mostRecent, ok = query.Hints.GetBool(traceql.HintMostRecent, false); !ok {
mostRecent = false
}
}
return combiner.NewTypedSearch(int(limit), mostRecent), nil
}
// adjusts the limit based on provided config
func adjustLimit(limit, defaultLimit, maxLimit uint32) (uint32, error) {
if limit == 0 {

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/http/httptest"
@ -31,7 +32,6 @@ import (
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb"
@ -139,14 +139,14 @@ func TestFrontendSearch(t *testing.T) {
func runnerBadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/search", nil)
httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil)
httpResp := httptest.NewRecorder()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "no org id", httpResp.Body.String())
require.Equal(t, http.StatusBadRequest, httpResp.Code)
// grpc
grpcReq := &tempopb.SearchRequest{}
grpcReq := &tempopb.SearchRequest{Query: "{}"}
err := f.streamingSearch(grpcReq, newMockStreamingServer[*tempopb.SearchResponse]("", nil))
require.Equal(t, status.Error(codes.InvalidArgument, "no org id"), err)
}
@ -175,8 +175,9 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
expectedStatusCode: 200,
expectedResponse: &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{{
TraceID: "1",
RootServiceName: search.RootSpanNotYetReceivedText,
TraceID: "1",
RootServiceName: search.RootSpanNotYetReceivedText,
StartTimeUnixNano: math.MaxUint64,
}},
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 4,
@ -212,8 +213,9 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
expectedStatusCode: 200,
expectedResponse: &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{{
TraceID: "1",
RootServiceName: search.RootSpanNotYetReceivedText,
TraceID: "1",
RootServiceName: search.RootSpanNotYetReceivedText,
StartTimeUnixNano: math.MaxUint64,
}},
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 8,
@ -270,7 +272,7 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
func runnerClientCancelContext(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/search", nil)
httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil)
httpResp := httptest.NewRecorder()
ctx, cancel := context.WithCancel(httpReq.Context())
@ -293,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) {
time.Sleep(50 * time.Millisecond)
cancel()
}()
grpcReq := &tempopb.SearchRequest{}
grpcReq := &tempopb.SearchRequest{Query: "{}"}
err := f.streamingSearch(grpcReq, srv)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}
@ -304,7 +306,8 @@ func TestSearchLimitHonored(t *testing.T) {
return &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
TraceID: util.TraceIDToHexString(test.ValidTraceID(nil)),
TraceID: util.TraceIDToHexString(test.ValidTraceID(nil)),
StartTimeUnixNano: math.MaxUint64, // forces GRPCDiff in the search combiner to return this trace b/c it's always after CompletedThroughSeconds
},
},
Metrics: &tempopb.SearchMetrics{
@ -324,6 +327,7 @@ func TestSearchLimitHonored(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
DefaultLimit: 10,
MaxLimit: 15,
},
@ -382,13 +386,12 @@ func TestSearchLimitHonored(t *testing.T) {
tenant := "1|2|3|4|5|6"
// due to the blocks we will have 4 trace ids normally
httpReq := httptest.NewRequest("GET", "/api/search", nil)
httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil)
httpReq, err := api.BuildSearchRequest(httpReq, tc.request)
require.NoError(t, err)
ctx := user.InjectOrgID(httpReq.Context(), tenant)
httpReq = httpReq.WithContext(ctx)
httpResp := httptest.NewRecorder()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
@ -404,18 +407,18 @@ func TestSearchLimitHonored(t *testing.T) {
}
// grpc
combiner := traceql.NewMetadataCombiner()
distinctTraces := map[string]struct{}{}
err = f.streamingSearch(tc.request, newMockStreamingServer(tenant, func(i int, sr *tempopb.SearchResponse) {
// combine
for _, t := range sr.Traces {
combiner.AddMetadata(t)
distinctTraces[t.TraceID] = struct{}{}
}
}))
if tc.badRequest {
require.Equal(t, status.Error(codes.InvalidArgument, "adjust limit: limit 20 exceeds max limit 15"), err)
require.Equal(t, status.Error(codes.InvalidArgument, "limit 20 exceeds max limit 15"), err)
} else {
require.NoError(t, err)
require.Equal(t, combiner.Count(), tc.expectedTraces)
require.Equal(t, tc.expectedTraces, len(distinctTraces))
}
})
}
@ -478,8 +481,9 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
}
},
}, nil, &Config{
MultiTenantQueriesEnabled: true,
MaxRetries: 0, // disable retries or it will try twice and get success. the querier response is designed to fail exactly once
MultiTenantQueriesEnabled: true,
MaxQueryExpressionSizeBytes: 100000,
MaxRetries: 0, // disable retries or it will try twice and get success. the querier response is designed to fail exactly once
TraceByID: TraceByIDConfig{
QueryShards: minQueryShards,
SLO: testSLOcfg,
@ -488,6 +492,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -501,7 +506,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
},
}, nil)
httpReq := httptest.NewRequest("GET", "/api/search?start=1&end=10000", nil)
httpReq := httptest.NewRequest("GET", "/api/search?start=1&end=10000&q={}", nil)
httpResp := httptest.NewRecorder()
ctx := user.InjectOrgID(httpReq.Context(), "foo")
@ -523,8 +528,9 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
}
},
}, nil, &Config{
MultiTenantQueriesEnabled: true,
MaxRetries: 0, // disable retries or it will try twice and get success
MultiTenantQueriesEnabled: true,
MaxQueryExpressionSizeBytes: 100000,
MaxRetries: 0, // disable retries or it will try twice and get success
TraceByID: TraceByIDConfig{
QueryShards: minQueryShards,
SLO: testSLOcfg,
@ -533,6 +539,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -548,7 +555,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
// grpc
srv := newMockStreamingServer[*tempopb.SearchResponse]("bar", nil)
grpcReq := &tempopb.SearchRequest{}
grpcReq := &tempopb.SearchRequest{Query: "{}"}
err := f.streamingSearch(grpcReq, srv)
require.Equal(t, tc.expectedErr, err)
}
@ -719,7 +726,8 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te
return &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
TraceID: "1",
TraceID: "1",
StartTimeUnixNano: math.MaxUint64, // forces GRPCDiff in the search combiner to return this trace b/c it's always after CompletedThroughSeconds
},
},
Metrics: &tempopb.SearchMetrics{
@ -779,6 +787,7 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},

View File

@ -3,11 +3,11 @@ package frontend
import (
"context"
"fmt"
"math"
"net/http"
"time"
"github.com/go-kit/log" //nolint:all deprecated
"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/dskit/user"
"github.com/segmentio/fasthash/fnv1a"
@ -24,6 +24,7 @@ import (
const (
defaultTargetBytesPerRequest = 100 * 1024 * 1024
defaultConcurrentRequests = 1000
defaultMostRecentShards = 200
)
type SearchSharderConfig struct {
@ -35,6 +36,7 @@ type SearchSharderConfig struct {
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"`
IngesterShards int `yaml:"ingester_shards,omitempty"`
MostRecentShards int `yaml:"most_recent_shards,omitempty"`
MaxSpansPerSpanSet uint32 `yaml:"max_spans_per_span_set,omitempty"`
}
@ -101,67 +103,24 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin
// build request to search ingesters based on query_ingesters_until config and time range
// pass subCtx in requests so we can cancel and exit early
err = s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh)
jobMetrics, err := s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh)
if err != nil {
return nil, err
}
// Check the number of requests that were were written to the request channel
// before we start reading them.
ingesterJobs := len(reqCh)
// pass subCtx in requests so we can cancel and exit early
totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, reqCh, func(err error) {
s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, jobMetrics, reqCh, func(err error) {
// todo: actually find a way to return this error to the user
s.logger.Log("msg", "search: failed to build backend requests", "err", err)
})
totalJobs += ingesterJobs
// send a job to communicate the search metrics. this is consumed by the combiner to calculate totalblocks/bytes/jobs
var jobMetricsResponse pipeline.Responses[combiner.PipelineResponse]
if totalJobs > 0 {
resp := &tempopb.SearchResponse{
Metrics: &tempopb.SearchMetrics{
TotalBlocks: uint32(totalBlocks),
TotalBlockBytes: totalBlockBytes,
TotalJobs: uint32(totalJobs),
},
}
m := jsonpb.Marshaler{}
body, err := m.MarshalToString(resp)
if err != nil {
return nil, fmt.Errorf("failed to marshal search metrics: %w", err)
}
jobMetricsResponse = pipeline.NewSuccessfulResponse(body)
}
// execute requests
return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, jobMetricsResponse, s.next), nil
}
// blockMetas returns all relevant blockMetas given a start/end
func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
// reduce metas to those in the requested range
allMetas := s.reader.BlockMetas(tenantID)
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start &&
m.ReplicationFactor == backend.DefaultReplicationFactor { // This check skips generator blocks (RF=1)
metas = append(metas, m)
}
}
return metas
return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, pipeline.NewAsyncResponse(jobMetrics), s.next), nil
}
// backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it.
// it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs
func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan<- pipeline.Request, errFn func(error)) (totalJobs, totalBlocks int, totalBlockBytes uint64) {
var blocks []*backend.BlockMeta
func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, resp *combiner.SearchJobResponse, reqCh chan<- pipeline.Request, errFn func(error)) {
// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
@ -177,45 +136,53 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin
return
}
// get block metadata of blocks in start, end duration
blocks = s.blockMetas(int64(start), int64(end), tenantID)
targetBytesPerRequest := s.cfg.TargetBytesPerRequest
blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), start, end, backend.DefaultReplicationFactor)
// calculate metrics to return to the caller
totalBlocks = len(blocks)
for _, b := range blocks {
p := pagesPerRequest(b, targetBytesPerRequest)
resp.TotalBlocks = len(blocks)
totalJobs += int(b.TotalRecords) / p
if int(b.TotalRecords)%p != 0 {
totalJobs++
}
totalBlockBytes += b.Size_
}
blockIter := backendJobsFunc(blocks, s.cfg.TargetBytesPerRequest, s.cfg.MostRecentShards, searchReq.End)
blockIter(func(jobs int, sz uint64, completedThroughTime uint32) {
resp.TotalJobs += jobs
resp.TotalBytes += sz
resp.Shards = append(resp.Shards, combiner.SearchShards{
TotalJobs: uint32(jobs),
CompletedThroughSeconds: completedThroughTime,
})
}, nil)
go func() {
buildBackendRequests(ctx, tenantID, parent, searchReq, blocks, targetBytesPerRequest, reqCh, errFn)
buildBackendRequests(ctx, tenantID, parent, searchReq, blockIter, reqCh, errFn)
}()
return
}
// ingesterRequest returns a new start and end time range for the backend as well as an http request
// that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query.
// since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from
// unexpectedly changing the passed searchReq.
func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error {
func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) (*combiner.SearchJobResponse, error) {
resp := &combiner.SearchJobResponse{
Shards: make([]combiner.SearchShards, 0, s.cfg.MostRecentShards+1), // +1 for the ingester shard
}
// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
return buildIngesterRequest(tenantID, parent, &searchReq, reqCh)
// one shard that covers all time
resp.TotalJobs = 1
resp.Shards = append(resp.Shards, combiner.SearchShards{
TotalJobs: 1,
CompletedThroughSeconds: 1,
})
return resp, buildIngesterRequest(tenantID, parent, &searchReq, reqCh)
}
ingesterUntil := uint32(time.Now().Add(-s.cfg.QueryIngestersUntil).Unix())
// if there's no overlap between the query and ingester range just return nil
if searchReq.End < ingesterUntil {
return nil
return resp, nil
}
ingesterStart := searchReq.Start
@ -228,7 +195,7 @@ func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.R
// if ingester start == ingester end then we don't need to query it
if ingesterStart == ingesterEnd {
return nil
return resp, nil
}
searchReq.Start = ingesterStart
@ -265,11 +232,20 @@ func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.R
err := buildIngesterRequest(tenantID, parent, &subReq, reqCh)
if err != nil {
return err
return nil, err
}
}
return nil
// add one shard that covers no time at all. this will force the combiner to wait
// for ingester requests to complete before moving on to the backend requests
ingesterJobs := len(reqCh)
resp.TotalJobs = ingesterJobs
resp.Shards = append(resp.Shards, combiner.SearchShards{
TotalJobs: uint32(ingesterJobs),
CompletedThroughSeconds: math.MaxUint32,
})
return resp, nil
}
// maxDuration returns the max search duration allowed for this tenant.
@ -303,60 +279,54 @@ func backendRange(start, end uint32, queryBackendAfter time.Duration) (uint32, u
// buildBackendRequests returns a slice of requests that cover all blocks in the store
// that are covered by start/end.
func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error)) {
func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, blockIter func(shardIterFn, jobIterFn), reqCh chan<- pipeline.Request, errFn func(error)) {
defer close(reqCh)
queryHash := hashForSearchRequest(searchReq)
colsToJSON := api.NewDedicatedColumnsToJSON()
for _, m := range metas {
pages := pagesPerRequest(m, bytesPerRequest)
if pages == 0 {
continue
}
blockIter(nil, func(m *backend.BlockMeta, shard, startPage, pages int) {
blockID := m.BlockID.String()
dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
return
}
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
DataEncoding: m.DataEncoding,
Version: m.Version,
Size_: m.Size_,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
}, dedColsJSON)
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
DataEncoding: m.DataEncoding,
Version: m.Version,
Size_: m.Size_,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
}, dedColsJSON)
return r, err
})
if err != nil {
errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err))
continue
}
key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages)
pipelineR.SetCacheKey(key)
select {
case reqCh <- pipelineR:
case <-ctx.Done():
// ignore the error if there is one. it will be handled elsewhere
return
}
return r, err
})
if err != nil {
errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err))
return
}
}
key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages)
pipelineR.SetCacheKey(key)
pipelineR.SetResponseData(shard)
select {
case reqCh <- pipelineR:
case <-ctx.Done():
// ignore the error if there is one. it will be handled elsewhere
return
}
})
}
// hashForSearchRequest returns a uint64 hash of the query. if the query is invalid it returns a 0 hash.
@ -411,6 +381,78 @@ func buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq *t
return err
}
subR.SetResponseData(0) // ingester requests are always shard 0
reqCh <- subR
return nil
}
type (
shardIterFn func(jobs int, sz uint64, completedThroughTime uint32)
jobIterFn func(m *backend.BlockMeta, shard, startPage, pages int)
)
// backendJobsFunc provides an iter func with 2 callbacks designed to be used once to calculate job and shard metrics and a second time
// to generate actual jobs.
func backendJobsFunc(blocks []*backend.BlockMeta, targetBytesPerRequest int, maxShards int, end uint32) func(shardIterFn, jobIterFn) {
blocksPerShard := len(blocks) / maxShards
// if we have fewer blocks than shards then every shard is one block
if blocksPerShard == 0 {
blocksPerShard = 1
}
return func(shardIterCallback shardIterFn, jobIterCallback jobIterFn) {
currentShard := 0
jobsInShard := 0
bytesInShard := uint64(0)
blocksInShard := 0
for _, b := range blocks {
pages := pagesPerRequest(b, targetBytesPerRequest)
jobsInBlock := 0
if pages == 0 {
continue
}
// if jobIterCallBack is nil we can skip the loop and directly calc the jobsInBlock
if jobIterCallback == nil {
jobsInBlock = int(b.TotalRecords) / pages
if int(b.TotalRecords)%pages != 0 {
jobsInBlock++
}
} else {
for startPage := 0; startPage < int(b.TotalRecords); startPage += pages {
jobIterCallback(b, currentShard, startPage, pages)
jobsInBlock++
}
}
// do we need to roll to a new shard?
jobsInShard += jobsInBlock
bytesInShard += b.Size_
blocksInShard++
// -1 b/c we will likely add a final shard below
// end comparison b/c there's no point in ending a shard that can't release any results
if blocksInShard >= blocksPerShard && currentShard < maxShards-1 && b.EndTime.Unix() < int64(end) {
if shardIterCallback != nil {
shardIterCallback(jobsInShard, bytesInShard, uint32(b.EndTime.Unix()))
}
currentShard++
jobsInShard = 0
bytesInShard = 0
blocksInShard = 0
}
}
// final shard - note that we are overpacking the final shard due to the integer math as well as the limit of 200 shards total. if the search
// this is the least impactful shard to place extra jobs in as it is searched last. if we make it here the chances of this being an exhaustive search
// are higher
if shardIterCallback != nil && jobsInShard > 0 {
shardIterCallback(jobsInShard, bytesInShard, 1) // final shard can cover all time. we don't need to be precise
}
}
}

View File

@ -1,13 +1,14 @@
package frontend
import (
"bytes"
"context"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
"sort"
"strconv"
"strings"
"testing"
@ -224,9 +225,10 @@ func TestBuildBackendRequests(t *testing.T) {
ctx, cancelCause := context.WithCancelCause(context.Background())
reqCh := make(chan pipeline.Request)
iterFn := backendJobsFunc(tc.metas, tc.targetBytesPerRequest, defaultMostRecentShards, math.MaxUint32)
go func() {
buildBackendRequests(ctx, "test", pipeline.NewHTTPRequest(req), searchReq, tc.metas, tc.targetBytesPerRequest, reqCh, cancelCause)
buildBackendRequests(ctx, "test", pipeline.NewHTTPRequest(req), searchReq, iterFn, reqCh, cancelCause)
}()
actualURIs := []string{}
@ -250,7 +252,9 @@ func TestBackendRequests(t *testing.T) {
bm.TotalRecords = 2
s := &asyncSearchSharder{
cfg: SearchSharderConfig{},
cfg: SearchSharderConfig{
MostRecentShards: defaultMostRecentShards,
},
reader: &mockReader{metas: []*backend.BlockMeta{bm}},
}
@ -317,10 +321,12 @@ func TestBackendRequests(t *testing.T) {
ctx, cancelCause := context.WithCancelCause(context.Background())
pipelineRequest := pipeline.NewHTTPRequest(r)
jobs, blocks, blockBytes := s.backendRequests(ctx, "test", pipelineRequest, searchReq, reqCh, cancelCause)
require.Equal(t, tc.expectedJobs, jobs)
require.Equal(t, tc.expectedBlocks, blocks)
require.Equal(t, tc.expectedBlockBytes, blockBytes)
searchJobResponse := &combiner.SearchJobResponse{}
s.backendRequests(ctx, "test", pipelineRequest, searchReq, searchJobResponse, reqCh, cancelCause)
require.Equal(t, tc.expectedJobs, searchJobResponse.TotalJobs)
require.Equal(t, tc.expectedBlocks, searchJobResponse.TotalBlocks)
require.Equal(t, tc.expectedBlockBytes, searchJobResponse.TotalBytes)
actualReqURIs := []string{}
for r := range reqCh {
@ -494,13 +500,24 @@ func TestIngesterRequests(t *testing.T) {
pr := pipeline.NewHTTPRequest(req)
pr.SetWeight(2)
err = s.ingesterRequests("test", pr, *searchReq, reqChan)
actualSearchResponse, err := s.ingesterRequests("test", pr, *searchReq, reqChan)
if tc.expectedError != nil {
assert.Equal(t, tc.expectedError, err)
require.Equal(t, tc.expectedError, err)
continue
}
assert.NoError(t, err)
assert.Equal(t, len(tc.expectedURI), len(reqChan))
require.NoError(t, err)
require.Equal(t, len(tc.expectedURI), len(reqChan))
require.Equal(t, len(tc.expectedURI), actualSearchResponse.TotalJobs)
if len(tc.expectedURI) > 0 {
require.Equal(t, len(tc.expectedURI), int(actualSearchResponse.Shards[0].TotalJobs))
expectedCompletedThrough := math.MaxUint32 // normal ingester shard completes no time on purpose
if searchReq.Start == 0 && searchReq.End == 0 { // ingester only search completes all time on purpose
expectedCompletedThrough = 1
}
require.Equal(t, expectedCompletedThrough, int(actualSearchResponse.Shards[0].CompletedThroughSeconds))
} else {
require.Equal(t, 0, len(actualSearchResponse.Shards))
}
// drain the channel and check the URIs
for _, expectedURI := range tc.expectedURI {
@ -667,6 +684,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) {
QueryIngestersUntil: 15 * time.Minute,
ConcurrentRequests: 1, // 1 concurrent request to force order
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
IngesterShards: 1,
}, log.NewNopLogger())
testRT := sharder.Wrap(next)
@ -680,29 +698,24 @@ func TestTotalJobsIncludesIngester(t *testing.T) {
resps, err := testRT.RoundTrip(pipeline.NewHTTPRequest(req))
require.NoError(t, err)
// find a response with total jobs > . this is the metadata response
var resp *tempopb.SearchResponse
totalJobs := 0
for {
res, done, err := resps.Next(context.Background())
r := res.HTTPResponse()
require.NoError(t, err)
require.Equal(t, 200, r.StatusCode)
actualResp := &tempopb.SearchResponse{}
bytesResp, err := io.ReadAll(r.Body)
require.NoError(t, err)
err = jsonpb.Unmarshal(bytes.NewReader(bytesResp), actualResp)
require.NoError(t, err)
if res.IsMetadata() {
searchJobResponse := res.(*combiner.SearchJobResponse)
totalJobs += searchJobResponse.TotalJobs
if actualResp.Metrics.TotalJobs > 0 {
resp = actualResp
break
}
require.NoError(t, err)
require.False(t, done)
}
// 2 jobs for the meta + 1 for th ingester
assert.Equal(t, uint32(3), resp.Metrics.TotalJobs)
// 2 jobs for the meta + 1 for the ingester
assert.Equal(t, 3, totalJobs)
}
func TestSearchSharderRoundTripBadRequest(t *testing.T) {
@ -716,6 +729,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) {
sharder := newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
MaxSpansPerSpanSet: 100,
}, log.NewNopLogger())
@ -756,6 +770,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) {
sharder = newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, log.NewNopLogger())
testRT = sharder.Wrap(next)
@ -870,6 +885,126 @@ func TestHashTraceQLQuery(t *testing.T) {
require.NotEqual(t, h1, h2)
}
func TestBackendShards(t *testing.T) {
tcs := []struct {
name string
maxShards int
searchEnd uint32
expected []combiner.SearchShards
}{
{
name: "1 shard, puts all jobs in one shard",
maxShards: 1,
searchEnd: 50,
expected: []combiner.SearchShards{
{TotalJobs: 8, CompletedThroughSeconds: 1},
},
},
{
name: "2 shards, split evenly between",
maxShards: 2,
searchEnd: 50,
expected: []combiner.SearchShards{
{TotalJobs: 4, CompletedThroughSeconds: 30},
{TotalJobs: 4, CompletedThroughSeconds: 1},
},
},
{
name: "3 shards, one for each block",
maxShards: 3,
searchEnd: 50,
expected: []combiner.SearchShards{
{TotalJobs: 2, CompletedThroughSeconds: 40},
{TotalJobs: 2, CompletedThroughSeconds: 30},
{TotalJobs: 4, CompletedThroughSeconds: 1},
},
},
{
name: "4 shards, one for each block",
maxShards: 4,
searchEnd: 50,
expected: []combiner.SearchShards{
{TotalJobs: 2, CompletedThroughSeconds: 40},
{TotalJobs: 2, CompletedThroughSeconds: 30},
{TotalJobs: 2, CompletedThroughSeconds: 20},
{TotalJobs: 2, CompletedThroughSeconds: 1},
},
},
{
name: "5 shards, one for each block",
maxShards: 5,
searchEnd: 50,
expected: []combiner.SearchShards{
{TotalJobs: 2, CompletedThroughSeconds: 40},
{TotalJobs: 2, CompletedThroughSeconds: 30},
{TotalJobs: 2, CompletedThroughSeconds: 20},
{TotalJobs: 2, CompletedThroughSeconds: 10},
},
},
{
name: "4 shards, search end forces 2 blocks in the first shard",
maxShards: 4,
searchEnd: 35,
expected: []combiner.SearchShards{
{TotalJobs: 4, CompletedThroughSeconds: 30},
{TotalJobs: 2, CompletedThroughSeconds: 20},
{TotalJobs: 2, CompletedThroughSeconds: 10},
},
},
{
name: "4 shards, search end forces 3 blocks in the first shard",
maxShards: 4,
searchEnd: 25,
expected: []combiner.SearchShards{
{TotalJobs: 6, CompletedThroughSeconds: 20},
{TotalJobs: 2, CompletedThroughSeconds: 10},
},
},
{
name: "2 shards, search end forces 2 blocks in the first shard",
maxShards: 2,
searchEnd: 35,
expected: []combiner.SearchShards{
{TotalJobs: 4, CompletedThroughSeconds: 30},
{TotalJobs: 4, CompletedThroughSeconds: 1},
},
},
}
// create 4 metas with 2 records each for all the above test cases to use. 8 jobs total
metas := make([]*backend.BlockMeta, 0, 4)
for i := 0; i < 4; i++ {
metas = append(metas, &backend.BlockMeta{
StartTime: time.Unix(int64(i*10), 0), // block 0 starts at 0
EndTime: time.Unix(int64(i*10)+10, 0), // block 0 ends a 10
Size_: defaultTargetBytesPerRequest * 2, // 2 jobs per block
TotalRecords: 2,
BlockID: backend.MustParse("00000000-0000-0000-0000-000000000000"),
})
}
// sort
sort.Slice(metas, func(i, j int) bool {
return metas[i].EndTime.After(metas[j].EndTime)
})
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
fn := backendJobsFunc(metas, defaultTargetBytesPerRequest, tc.maxShards, tc.searchEnd)
actualShards := []combiner.SearchShards{}
fn(func(jobs int, _ uint64, completedThroughTime uint32) {
actualShards = append(actualShards, combiner.SearchShards{
TotalJobs: uint32(jobs),
CompletedThroughSeconds: completedThroughTime,
})
}, nil)
assert.Equal(t, tc.expected, actualShards)
})
}
}
func urisEqual(t *testing.T, expectedURIs, actualURIs []string) {
require.Equal(t, len(expectedURIs), len(actualURIs))

View File

@ -16,6 +16,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
"github.com/gorilla/mux"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
@ -49,7 +50,7 @@ func runnerTagsBadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/search/tags", nil)
httpResp := httptest.NewRecorder()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsHandler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "no org id", httpResp.Body.String())
require.Equal(t, http.StatusBadRequest, httpResp.Code)
@ -63,7 +64,7 @@ func runnerTagsV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tags", nil)
httpResp := httptest.NewRecorder()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "no org id", httpResp.Body.String())
require.Equal(t, http.StatusBadRequest, httpResp.Code)
@ -76,8 +77,9 @@ func runnerTagsV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/search/tag/foo/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpResp := httptest.NewRecorder()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsValuesHandler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "no org id", httpResp.Body.String())
require.Equal(t, http.StatusBadRequest, httpResp.Code)
@ -90,8 +92,9 @@ func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
func runnerTagValuesV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpResp := httptest.NewRecorder()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "no org id", httpResp.Body.String())
require.Equal(t, http.StatusBadRequest, httpResp.Code)
@ -115,7 +118,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
cancel()
}()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "context canceled", httpResp.Body.String())
require.Equal(t, 499, httpResp.Code) // todo: is this 499 valid?
@ -134,6 +137,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpResp := httptest.NewRecorder()
ctx, cancel := context.WithCancel(httpReq.Context())
@ -145,7 +149,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
cancel()
}()
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "context canceled", httpResp.Body.String())
require.Equal(t, 499, httpResp.Code) // todo: is this 499 valid?
@ -229,6 +233,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -248,7 +253,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) {
ctx := user.InjectOrgID(httpReq.Context(), "foo")
httpReq = httpReq.WithContext(ctx)
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, tc.expectedMessage, httpResp.Body.String())
require.Equal(t, tc.expectedCode, httpResp.Code)
@ -274,6 +279,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -360,6 +366,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
@ -373,13 +380,14 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
},
}, nil)
httpReq := httptest.NewRequest("GET", "/api/v2/search/tags?start=1&end=10000", nil)
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values?start=1&end=10000", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpResp := httptest.NewRecorder()
ctx := user.InjectOrgID(httpReq.Context(), "foo")
httpReq = httpReq.WithContext(ctx)
f.SearchHandler.ServeHTTP(httpResp, httpReq)
f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, tc.expectedMessage, httpResp.Body.String())
require.Equal(t, tc.expectedCode, httpResp.Code)
@ -405,6 +413,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},

View File

@ -236,26 +236,9 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.
return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, nil, s.next), nil
}
// blockMetas returns all relevant blockMetas given a start/end
func (s searchTagSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
// reduce metas to those in the requested range
allMetas := s.reader.BlockMetas(tenantID)
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start {
metas = append(metas, m)
}
}
return metas
}
// backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it.
// it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs
func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq, reqCh chan<- pipeline.Request, errFn func(error)) {
var blocks []*backend.BlockMeta
// request without start or end, search only in ingester
if searchReq.start() == 0 || searchReq.end() == 0 {
close(reqCh)
@ -272,7 +255,7 @@ func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string,
}
// get block metadata of blocks in start, end duration
blocks = s.blockMetas(int64(start), int64(end), tenantID)
blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), start, end, backend.DefaultReplicationFactor)
targetBytesPerRequest := s.cfg.TargetBytesPerRequest

View File

@ -294,6 +294,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) {
sharder := newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, parseTagsRequest, log.NewNopLogger())
testRT := sharder.Wrap(next)
@ -328,6 +329,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) {
sharder = newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, parseTagsRequest, log.NewNopLogger())
testRT = sharder.Wrap(next)

View File

@ -33,6 +33,7 @@ var config = &Config{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},

View File

@ -237,12 +237,10 @@ func TestSearchWAL(t *testing.T) {
// search WAL
ctx := user.InjectOrgID(context.Background(), "test")
searchReq := &tempopb.SearchRequest{Tags: map[string]string{
"foo": "bar",
}}
searchReq := &tempopb.SearchRequest{Query: "{ }"}
results, err := inst.Search(ctx, searchReq)
require.NoError(t, err)
require.Equal(t, uint32(1), results.Metrics.InspectedTraces)
require.Equal(t, 1, len(results.Traces))
// Shutdown
require.NoError(t, i.stopping(nil))
@ -258,7 +256,7 @@ func TestSearchWAL(t *testing.T) {
results, err = inst.Search(ctx, searchReq)
require.NoError(t, err)
require.Equal(t, uint32(1), results.Metrics.InspectedTraces)
require.Equal(t, 1, len(results.Traces))
}
func TestRediscoverLocalBlocks(t *testing.T) {

View File

@ -7,7 +7,6 @@ import (
"sync"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/segmentio/fasthash/fnv1a"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -41,24 +40,42 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
span.AddEvent("SearchRequest", trace.WithAttributes(attribute.String("request", req.String())))
mostRecent := false
if len(req.Query) > 0 {
rootExpr, err := traceql.Parse(req.Query)
if err != nil {
return nil, fmt.Errorf("error parsing query: %w", err)
}
ok := false
if mostRecent, ok = rootExpr.Hints.GetBool(traceql.HintMostRecent, false); !ok {
mostRecent = false
}
}
var (
resultsMtx = sync.Mutex{}
combiner = traceql.NewMetadataCombiner()
combiner = traceql.NewMetadataCombiner(maxResults, mostRecent)
metrics = &tempopb.SearchMetrics{}
opts = common.DefaultSearchOptions()
anyErr atomic.Error
)
search := func(blockID uuid.UUID, block common.Searcher, spanName string) {
search := func(blockMeta *backend.BlockMeta, block common.Searcher, spanName string) {
ctx, span := tracer.Start(ctx, "instance.searchBlock."+spanName)
defer span.End()
span.AddEvent("block entry mtx acquired")
span.SetAttributes(attribute.String("blockID", blockID.String()))
span.SetAttributes(attribute.String("blockID", blockMeta.BlockID.String()))
var resp *tempopb.SearchResponse
var err error
// if the combiner is complete for the block's end time, we can skip searching it
if combiner.IsCompleteFor(uint32(blockMeta.EndTime.Unix())) {
return
}
if api.IsTraceQLQuery(req) {
// note: we are creating new engine for each wal block,
// and engine.ExecuteSearch is parsing the query for each block
@ -70,7 +87,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
}
if errors.Is(err, common.ErrUnsupported) {
level.Warn(log.Logger).Log("msg", "block does not support search", "blockID", blockID)
level.Warn(log.Logger).Log("msg", "block does not support search", "blockID", blockMeta.BlockID)
return
}
if errors.Is(err, context.Canceled) {
@ -78,7 +95,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
return
}
if err != nil {
level.Error(log.Logger).Log("msg", "error searching block", "blockID", blockID, "err", err)
level.Error(log.Logger).Log("msg", "error searching block", "blockID", blockMeta.BlockID, "err", err)
anyErr.Store(err)
return
}
@ -95,14 +112,9 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
metrics.InspectedBytes += resp.Metrics.InspectedBytes
}
if combiner.Count() >= maxResults {
return
}
for _, tr := range resp.Traces {
combiner.AddMetadata(tr)
if combiner.Count() >= maxResults {
// Cancel all other tasks
if combiner.IsCompleteFor(traceql.TimestampNever) {
cancel()
return
}
@ -119,18 +131,12 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
i.headBlockMtx.RLock()
span.AddEvent("acquired headblock mtx")
if includeBlock(i.headBlock.BlockMeta(), req) {
search((uuid.UUID)(i.headBlock.BlockMeta().BlockID), i.headBlock, "headBlock")
search(i.headBlock.BlockMeta(), i.headBlock, "headBlock")
}
i.headBlockMtx.RUnlock()
if err := anyErr.Load(); err != nil {
return nil, err
}
if combiner.Count() >= maxResults {
return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: metrics,
}, nil
}
// Search all other blocks (concurrently)
// Lock blocks mutex until all search tasks are finished and this function exits. This avoids
@ -150,7 +156,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
wg.Add(1)
go func(b common.WALBlock) {
defer wg.Done()
search((uuid.UUID)(b.BlockMeta().BlockID), b, "completingBlock")
search(b.BlockMeta(), b, "completingBlock")
}(b)
}
@ -161,7 +167,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
wg.Add(1)
go func(b *LocalBlock) {
defer wg.Done()
search((uuid.UUID)(b.BlockMeta().BlockID), b, "completeBlock")
search(b.BlockMeta(), b, "completeBlock")
}(b)
}

View File

@ -43,9 +43,8 @@ func TestInstanceSearch(t *testing.T) {
ids, _, _, _ := writeTracesForSearch(t, i, "", tagKey, tagValue, false, false)
req := &tempopb.SearchRequest{
Tags: map[string]string{},
Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue),
}
req.Tags[tagKey] = tagValue
req.Limit = uint32(len(ids)) + 1
// Test after appending to WAL. writeTracesforSearch() makes sure all traces are in the wal
@ -187,27 +186,23 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) {
searchAndAssert := func(req *tempopb.SearchRequest, inspectedTraces uint32) {
sr := search(req, 0, 0)
assert.Len(t, sr.Traces, len(ids))
assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces)
checkEqual(t, ids, sr)
// writeTracesForSearch will build spans that end 1 second from now
// query 2 min range to have extra slack and always be within range
sr = search(req, uint32(time.Now().Add(-5*time.Minute).Unix()), uint32(time.Now().Add(5*time.Minute).Unix()))
assert.Len(t, sr.Traces, len(ids))
assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces)
checkEqual(t, ids, sr)
// search with start=5m from now, end=10m from now
sr = search(req, uint32(time.Now().Add(5*time.Minute).Unix()), uint32(time.Now().Add(10*time.Minute).Unix()))
// no results and should inspect 100 traces in wal
assert.Len(t, sr.Traces, 0)
assert.Equal(t, uint32(0), sr.Metrics.InspectedTraces)
}
req := &tempopb.SearchRequest{
Tags: map[string]string{},
Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue),
}
req.Tags[tagKey] = tagValue
req.Limit = uint32(len(ids)) + 1
// Test after appending to WAL.
@ -622,7 +617,7 @@ func TestInstanceSearchNoData(t *testing.T) {
i, _ := defaultInstance(t)
req := &tempopb.SearchRequest{
Tags: map[string]string{},
Query: "{}",
}
sr, err := i.Search(context.Background(), req)
@ -644,7 +639,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
tagValue := "bar"
req := &tempopb.SearchRequest{
Tags: map[string]string{tagKey: tagValue},
Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue),
}
end := make(chan struct{})
@ -774,11 +769,7 @@ func TestWALBlockDeletedDuringSearch(t *testing.T) {
go concurrent(func() {
_, err := i.Search(context.Background(), &tempopb.SearchRequest{
Tags: map[string]string{
// Not present in the data, so it will be an exhaustive
// search
"wuv": "xyz",
},
Query: `{ span.wuv = "xyz" }`,
})
require.NoError(t, err)
})
@ -819,7 +810,7 @@ func TestInstanceSearchMetrics(t *testing.T) {
search := func() *tempopb.SearchMetrics {
sr, err := i.Search(context.Background(), &tempopb.SearchRequest{
Tags: map[string]string{"foo": "bar"},
Query: fmt.Sprintf(`{ span.%s = "%s" }`, "foo", "bar"),
})
require.NoError(t, err)
return sr.Metrics
@ -834,14 +825,12 @@ func TestInstanceSearchMetrics(t *testing.T) {
err := i.CutCompleteTraces(0, true)
require.NoError(t, err)
m = search()
require.Equal(t, numTraces, m.InspectedTraces)
require.Less(t, numBytes, m.InspectedBytes)
// Test after cutting new headblock
blockID, err := i.CutBlockIfReady(0, 0, true)
require.NoError(t, err)
m = search()
require.Equal(t, numTraces, m.InspectedTraces)
require.Less(t, numBytes, m.InspectedBytes)
// Test after completing a block
@ -850,7 +839,7 @@ func TestInstanceSearchMetrics(t *testing.T) {
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err)
m = search()
require.Equal(t, numTraces, m.InspectedTraces)
require.Less(t, numBytes, m.InspectedBytes)
}
func BenchmarkInstanceSearchUnderLoad(b *testing.B) {

View File

@ -23,6 +23,7 @@ func (d *DedicatedColumnsToJSON) JSONForDedicatedColumns(cols backend.DedicatedC
}
hash := cols.Hash()
if jsonString, ok := d.columnsToJSON[hash]; ok {
return jsonString, nil
}

View File

@ -1,43 +1,96 @@
package traceql
import (
"math"
"slices"
"sort"
"strings"
"time"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
)
type MetadataCombiner struct {
trs map[string]*tempopb.TraceSearchMetadata
type MetadataCombiner interface {
AddMetadata(new *tempopb.TraceSearchMetadata) bool
IsCompleteFor(ts uint32) bool
Metadata() []*tempopb.TraceSearchMetadata
MetadataAfter(ts uint32) []*tempopb.TraceSearchMetadata
addSpanset(new *Spanset)
}
func NewMetadataCombiner() *MetadataCombiner {
return &MetadataCombiner{
trs: make(map[string]*tempopb.TraceSearchMetadata),
const TimestampNever = uint32(math.MaxUint32)
func NewMetadataCombiner(limit int, keepMostRecent bool) MetadataCombiner {
if keepMostRecent {
return newMostRecentCombiner(limit)
}
return newAnyCombiner(limit)
}
type anyCombiner struct {
trs map[string]*tempopb.TraceSearchMetadata
limit int
}
func newAnyCombiner(limit int) *anyCombiner {
return &anyCombiner{
trs: make(map[string]*tempopb.TraceSearchMetadata, limit),
limit: limit,
}
}
// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata
// conversion if the spanset will be added
func (c *anyCombiner) addSpanset(new *Spanset) {
// if it's already in the list, then we should add it
if _, ok := c.trs[util.TraceIDToHexString(new.TraceID)]; ok {
c.AddMetadata(asTraceSearchMetadata(new))
return
}
// if we don't have too many
if c.IsCompleteFor(0) {
return
}
c.AddMetadata(asTraceSearchMetadata(new))
}
// AddMetadata adds the new metadata to the map. if it already exists
// use CombineSearchResults to combine the two
func (c *MetadataCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) {
func (c *anyCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool {
if existing, ok := c.trs[new.TraceID]; ok {
combineSearchResults(existing, new)
return
return true
}
// if we don't have too many
if c.IsCompleteFor(0) {
return false
}
c.trs[new.TraceID] = new
return true
}
func (c *MetadataCombiner) Count() int {
func (c *anyCombiner) Count() int {
return len(c.trs)
}
func (c *MetadataCombiner) Exists(id string) bool {
func (c *anyCombiner) Exists(id string) bool {
_, ok := c.trs[id]
return ok
}
func (c *MetadataCombiner) Metadata() []*tempopb.TraceSearchMetadata {
func (c *anyCombiner) IsCompleteFor(_ uint32) bool {
return c.Count() >= c.limit && c.limit > 0
}
func (c *anyCombiner) Metadata() []*tempopb.TraceSearchMetadata {
m := make([]*tempopb.TraceSearchMetadata, 0, len(c.trs))
for _, tr := range c.trs {
m = append(m, tr)
@ -48,6 +101,130 @@ func (c *MetadataCombiner) Metadata() []*tempopb.TraceSearchMetadata {
return m
}
// MetadataAfter returns all traces that started after the given time. anyCombiner has no concept of time so it just returns all traces
func (c *anyCombiner) MetadataAfter(_ uint32) []*tempopb.TraceSearchMetadata {
return c.Metadata()
}
type mostRecentCombiner struct {
trs map[string]*tempopb.TraceSearchMetadata
trsSorted []*tempopb.TraceSearchMetadata
keepMostRecent int
}
func newMostRecentCombiner(limit int) *mostRecentCombiner {
return &mostRecentCombiner{
trs: make(map[string]*tempopb.TraceSearchMetadata, limit),
trsSorted: make([]*tempopb.TraceSearchMetadata, 0, limit),
keepMostRecent: limit,
}
}
// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata
// conversion if the spanset will be added
func (c *mostRecentCombiner) addSpanset(new *Spanset) {
// if we're not configured to keep most recent then just add it
if c.keepMostRecent == 0 || c.Count() < c.keepMostRecent {
c.AddMetadata(asTraceSearchMetadata(new))
return
}
// else let's see if it's worth converting this to a metadata and adding it
// if it's already in the list, then we should add it
if _, ok := c.trs[util.TraceIDToHexString(new.TraceID)]; ok {
c.AddMetadata(asTraceSearchMetadata(new))
return
}
// if it's within range
if c.OldestTimestampNanos() <= new.StartTimeUnixNanos {
c.AddMetadata(asTraceSearchMetadata(new))
return
}
// this spanset is too old to bother converting and adding it
}
// AddMetadata adds the new metadata to the map. if it already exists
// use CombineSearchResults to combine the two
func (c *mostRecentCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool {
if existing, ok := c.trs[new.TraceID]; ok {
combineSearchResults(existing, new)
return true
}
if c.Count() == c.keepMostRecent && c.keepMostRecent > 0 {
// if this is older than the oldest element, bail
if c.OldestTimestampNanos() > new.StartTimeUnixNano {
return false
}
// otherwise remove the oldest element and we'll add the new one below
oldest := c.trsSorted[c.Count()-1]
delete(c.trs, oldest.TraceID)
c.trsSorted = c.trsSorted[:len(c.trsSorted)-1]
}
// insert new in the right spot
c.trs[new.TraceID] = new
idx, _ := slices.BinarySearchFunc(c.trsSorted, new, func(a, b *tempopb.TraceSearchMetadata) int {
if a.StartTimeUnixNano > b.StartTimeUnixNano {
return -1
}
return 1
})
c.trsSorted = slices.Insert(c.trsSorted, idx, new)
return true
}
func (c *mostRecentCombiner) Count() int {
return len(c.trs)
}
func (c *mostRecentCombiner) Exists(id string) bool {
_, ok := c.trs[id]
return ok
}
// IsCompleteFor returns true if the combiner has reached the limit and all traces are after the given time
func (c *mostRecentCombiner) IsCompleteFor(ts uint32) bool {
if ts == TimestampNever {
return false
}
if c.Count() < c.keepMostRecent {
return false
}
return c.OldestTimestampNanos() > uint64(ts)*uint64(time.Second)
}
func (c *mostRecentCombiner) Metadata() []*tempopb.TraceSearchMetadata {
return c.trsSorted
}
// MetadataAfter returns all traces that started after the given time
func (c *mostRecentCombiner) MetadataAfter(afterSeconds uint32) []*tempopb.TraceSearchMetadata {
afterNanos := uint64(afterSeconds) * uint64(time.Second)
afterTraces := make([]*tempopb.TraceSearchMetadata, 0, len(c.trsSorted))
for _, tr := range c.trsSorted {
if tr.StartTimeUnixNano > afterNanos {
afterTraces = append(afterTraces, tr)
}
}
return afterTraces
}
func (c *mostRecentCombiner) OldestTimestampNanos() uint64 {
if len(c.trsSorted) == 0 {
return 0
}
return c.trsSorted[len(c.trsSorted)-1].StartTimeUnixNano
}
// combineSearchResults overlays the incoming search result with the existing result. This is required
// for the following reason: a trace may be present in multiple blocks, or in partial segments
// in live traces. The results should reflect elements of all segments.

View File

@ -1,10 +1,15 @@
package traceql
import (
"fmt"
"math/rand/v2"
"slices"
"testing"
"time"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/grafana/tempo/pkg/util"
"github.com/stretchr/testify/require"
)
@ -263,3 +268,52 @@ func TestCombineResults(t *testing.T) {
})
}
}
func TestCombinerKeepsMostRecent(t *testing.T) {
totalTraces := 10
keepMostRecent := 5
combiner := NewMetadataCombiner(keepMostRecent, true).(*mostRecentCombiner)
// make traces
traces := make([]*Spanset, totalTraces)
for i := 0; i < totalTraces; i++ {
traceID, err := util.HexStringToTraceID(fmt.Sprintf("%d", i))
require.NoError(t, err)
traces[i] = &Spanset{
TraceID: traceID,
StartTimeUnixNanos: uint64(i) * uint64(time.Second),
}
}
// save off the most recent and reverse b/c the combiner returns most recent first
expected := make([]*tempopb.TraceSearchMetadata, 0, keepMostRecent)
for i := totalTraces - keepMostRecent; i < totalTraces; i++ {
expected = append(expected, asTraceSearchMetadata(traces[i]))
}
slices.Reverse(expected)
rand.Shuffle(totalTraces, func(i, j int) {
traces[i], traces[j] = traces[j], traces[i]
})
// add to combiner
for i := 0; i < totalTraces; i++ {
combiner.addSpanset(traces[i])
}
// test that the most recent are kept
actual := combiner.Metadata()
require.Equal(t, expected, actual)
require.Equal(t, keepMostRecent, combiner.Count())
require.Equal(t, expected[len(expected)-1].StartTimeUnixNano, combiner.OldestTimestampNanos())
for _, tr := range expected {
require.True(t, combiner.Exists(tr.TraceID))
}
// test MetadataAfter. 10 traces are added with start times 0-9. We want to get all traces that started after 7
afterSeconds := uint32(7)
expectedTracesCount := totalTraces - int(afterSeconds+1)
actualTraces := combiner.MetadataAfter(afterSeconds)
require.Equal(t, expectedTracesCount, len(actualTraces))
}

View File

@ -55,6 +55,11 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq
return nil, err
}
var mostRecent, ok bool
if mostRecent, ok = rootExpr.Hints.GetBool(HintMostRecent, false); !ok {
mostRecent = false
}
if rootExpr.IsNoop() {
return &tempopb.SearchResponse{
Traces: nil,
@ -118,7 +123,7 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq
Traces: nil,
Metrics: &tempopb.SearchMetrics{},
}
combiner := NewMetadataCombiner()
combiner := NewMetadataCombiner(int(searchReq.Limit), mostRecent)
for {
spanset, err := iterator.Next(ctx)
if err != nil && !errors.Is(err, io.EOF) {
@ -128,9 +133,9 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq
if spanset == nil {
break
}
combiner.AddMetadata(e.asTraceSearchMetadata(spanset))
if combiner.Count() >= int(searchReq.Limit) && searchReq.Limit > 0 {
combiner.addSpanset(spanset)
if combiner.IsCompleteFor(TimestampNever) {
break
}
}
@ -256,7 +261,7 @@ func (e *Engine) createAutocompleteRequest(tag Attribute, pipeline Pipeline) Fet
return autocompleteReq
}
func (e *Engine) asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMetadata {
func asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMetadata {
metadata := &tempopb.TraceSearchMetadata{
TraceID: util.TraceIDToHexString(spanset.TraceID),
RootServiceName: spanset.RootServiceName,

View File

@ -253,9 +253,7 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) {
},
}
e := NewEngine()
traceSearchMetadata := e.asTraceSearchMetadata(spanSet)
traceSearchMetadata := asTraceSearchMetadata(spanSet)
expectedSpanset := &tempopb.SpanSet{
Matched: 2,

View File

@ -12,11 +12,12 @@ const (
HintTimeOverlapCutoff = "time_overlap_cutoff"
HintConcurrentBlocks = "concurrent_blocks"
HintExemplars = "exemplars"
HintMostRecent = "most_recent" // traceql search hint to return most recent results ordered by time
)
func isUnsafe(h string) bool {
switch h {
case HintSample, HintExemplars:
case HintSample, HintExemplars, HintMostRecent:
return false
default:
return true

View File

@ -6,7 +6,6 @@ import (
"fmt"
"math/rand"
"path"
"sort"
"strconv"
"sync"
"time"
@ -372,14 +371,6 @@ func (p *Poller) pollTenantBlocks(
newBlockList = append(newBlockList, newM...)
newCompactedBlocklist = append(newCompactedBlocklist, newCm...)
sort.Slice(newBlockList, func(i, j int) bool {
return newBlockList[i].StartTime.Before(newBlockList[j].StartTime)
})
sort.Slice(newCompactedBlocklist, func(i, j int) bool {
return newCompactedBlocklist[i].StartTime.Before(newCompactedBlocklist[j].StartTime)
})
return newBlockList, newCompactedBlocklist, nil
}

View File

@ -240,24 +240,11 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected
require.Equal(t, expectedID, (uuid.UUID)(blocklist[0].BlockID))
}
// confirm blocklists are in starttime ascending order
lastTime := time.Time{}
for _, b := range blocklist {
require.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime))
lastTime = b.StartTime
}
compactedBlocklist := rw.blocklist.CompactedMetas(testTenantID)
require.Len(t, compactedBlocklist, expectedCB)
if expectedCB > 0 && expectedID != uuid.Nil {
require.Equal(t, expectedID, (uuid.UUID)(compactedBlocklist[0].BlockID))
}
lastTime = time.Time{}
for _, b := range compactedBlocklist {
require.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime))
lastTime = b.StartTime
}
}
func TestIncludeBlock(t *testing.T) {