From 93d2b4277a2cd9829293199d0f7c1e4b8dfa057b Mon Sep 17 00:00:00 2001 From: Robert Lankford Date: Thu, 13 Mar 2025 14:47:04 -0700 Subject: [PATCH] add host info processor (#4698) * add host info processor implementation Signed-off-by: Robbie Lankford * fix lint * remove gauge custom expiration logic * make generate-manifest * add config validation; remove stale duration crud * refactor and clean up --------- Signed-off-by: Robbie Lankford --- docs/sources/tempo/configuration/manifest.md | 5 ++ modules/generator/config.go | 17 +++- .../generator/processor/hostinfo/config.go | 37 +++++++++ .../generator/processor/hostinfo/processor.go | 78 +++++++++++++++++++ .../processor/hostinfo/processor_test.go | 52 +++++++++++++ 5 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 modules/generator/processor/hostinfo/config.go create mode 100644 modules/generator/processor/hostinfo/processor.go create mode 100644 modules/generator/processor/hostinfo/processor_test.go diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index ec4e8221b..853cb2b20 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -624,6 +624,11 @@ metrics_generator: flush_to_storage: false concurrent_blocks: 10 time_overlap_cutoff: 0.2 + host_info: + host_identifiers: + - k8s.node.name + - host.id + metric_name: traces_host_info registry: collection_interval: 15s stale_duration: 15m0s diff --git a/modules/generator/config.go b/modules/generator/config.go index dff4435e1..1d5277df3 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -8,6 +8,7 @@ import ( "slices" "time" + "github.com/grafana/tempo/modules/generator/processor/hostinfo" "github.com/grafana/tempo/modules/generator/processor/localblocks" "github.com/grafana/tempo/modules/generator/processor/servicegraphs" "github.com/grafana/tempo/modules/generator/processor/spanmetrics" @@ -16,6 +17,7 @@ import ( "github.com/grafana/tempo/pkg/ingest" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" + "go.uber.org/multierr" ) const ( @@ -126,16 +128,29 @@ type ProcessorConfig struct { ServiceGraphs servicegraphs.Config `yaml:"service_graphs"` SpanMetrics spanmetrics.Config `yaml:"span_metrics"` LocalBlocks localblocks.Config `yaml:"local_blocks"` + HostInfo hostinfo.Config `yaml:"host_info"` } func (cfg *ProcessorConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.ServiceGraphs.RegisterFlagsAndApplyDefaults(prefix, f) cfg.SpanMetrics.RegisterFlagsAndApplyDefaults(prefix, f) cfg.LocalBlocks.RegisterFlagsAndApplyDefaults(prefix, f) + cfg.HostInfo.RegisterFlagsAndApplyDefaults(prefix, f) } func (cfg *ProcessorConfig) Validate() error { - return cfg.LocalBlocks.Validate() + var errs []error + if err := cfg.LocalBlocks.Validate(); err != nil { + errs = append(errs, err) + } + if err := cfg.HostInfo.Validate(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return multierr.Combine(errs...) + } + return nil } // copyWithOverrides creates a copy of the config using values set in the overrides. diff --git a/modules/generator/processor/hostinfo/config.go b/modules/generator/processor/hostinfo/config.go new file mode 100644 index 000000000..9d6787f2b --- /dev/null +++ b/modules/generator/processor/hostinfo/config.go @@ -0,0 +1,37 @@ +package hostinfo + +import ( + "errors" + "flag" + + "github.com/prometheus/common/model" +) + +const ( + defaultHostInfoMetric = "traces_host_info" +) + +type Config struct { + // HostIdentifiers defines the list of resource attributes used to derive + // a unique `grafana.host.id` value. In most cases, this should be [ "host.id" ] + HostIdentifiers []string `yaml:"host_identifiers"` + // MetricName defines the name of the metric that will be generated + MetricName string `yaml:"metric_name"` +} + +func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { + cfg.HostIdentifiers = []string{"k8s.node.name", "host.id"} + cfg.MetricName = defaultHostInfoMetric +} + +func (cfg *Config) Validate() error { + if len(cfg.HostIdentifiers) == 0 { + return errors.New("at least one value must be provided in host_identifiers") + } + + if !model.IsValidMetricName(model.LabelValue(cfg.MetricName)) { + return errors.New("metric_name is invalid") + } + + return nil +} diff --git a/modules/generator/processor/hostinfo/processor.go b/modules/generator/processor/hostinfo/processor.go new file mode 100644 index 000000000..4f9da3e39 --- /dev/null +++ b/modules/generator/processor/hostinfo/processor.go @@ -0,0 +1,78 @@ +package hostinfo + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/tempo/modules/generator/registry" + "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" +) + +const ( + Name = "host-info" + + hostInfoMetric = "traces_host_info" + hostIdentifierAttr = "grafana.host.id" +) + +type Processor struct { + Cfg Config + logger log.Logger + + gauge registry.Gauge + registry registry.Registry + metricName string + labels []string +} + +func (p *Processor) Name() string { + return Name +} + +func (p *Processor) findHostIdentifier(resourceSpans *v1.ResourceSpans) string { + attrs := resourceSpans.GetResource().GetAttributes() + for _, idAttr := range p.Cfg.HostIdentifiers { + for _, attr := range attrs { + if attr.GetKey() == idAttr { + if val := attr.GetValue(); val != nil { + if strVal := val.GetStringValue(); strVal != "" { + return strVal + } + } + } + } + } + return "" +} + +func (p *Processor) PushSpans(_ context.Context, req *tempopb.PushSpansRequest) { + values := make([]string, 1) + for i := range req.Batches { + resourceSpans := req.Batches[i] + if hostID := p.findHostIdentifier(resourceSpans); hostID != "" { + values[0] = hostID + labelValues := p.registry.NewLabelValueCombo( + p.labels, + values, + ) + p.gauge.Set(labelValues, 1) + } + } +} + +func (p *Processor) Shutdown(_ context.Context) {} + +func New(cfg Config, reg registry.Registry, logger log.Logger) (*Processor, error) { + labels := make([]string, 1) + labels[0] = hostIdentifierAttr + p := &Processor{ + Cfg: cfg, + logger: logger, + registry: reg, + metricName: cfg.MetricName, + gauge: reg.NewGauge(cfg.MetricName), + labels: labels, + } + return p, nil +} diff --git a/modules/generator/processor/hostinfo/processor_test.go b/modules/generator/processor/hostinfo/processor_test.go new file mode 100644 index 000000000..6ec20555e --- /dev/null +++ b/modules/generator/processor/hostinfo/processor_test.go @@ -0,0 +1,52 @@ +package hostinfo + +import ( + "context" + "strconv" + "testing" + + "github.com/grafana/tempo/modules/generator/registry" + "github.com/grafana/tempo/pkg/tempopb" + common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" + trace_v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/grafana/tempo/pkg/util/test" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHostInfo(t *testing.T) { + testRegistry := registry.NewTestRegistry() + + cfg := Config{} + cfg.RegisterFlagsAndApplyDefaults("", nil) + p, err := New(cfg, testRegistry, nil) + require.NoError(t, err) + require.Equal(t, p.Name(), Name) + defer p.Shutdown(context.TODO()) + + req := &tempopb.PushSpansRequest{ + Batches: []*trace_v1.ResourceSpans{ + test.MakeBatch(10, nil), + test.MakeBatch(10, nil), + }, + } + + for i, b := range req.Batches { + b.Resource.Attributes = append(b.Resource.Attributes, []*common_v1.KeyValue{ + {Key: "host.id", Value: &common_v1.AnyValue{Value: &common_v1.AnyValue_StringValue{StringValue: "test" + strconv.Itoa(i)}}}, + }...) + } + + p.PushSpans(context.Background(), req) + + lbls0 := labels.FromMap(map[string]string{ + hostIdentifierAttr: "test0", + }) + assert.Equal(t, 1.0, testRegistry.Query(hostInfoMetric, lbls0)) + + lbls1 := labels.FromMap(map[string]string{ + hostIdentifierAttr: "test1", + }) + assert.Equal(t, 1.0, testRegistry.Query(hostInfoMetric, lbls1)) +}