mirror of
https://github.com/siderolabs/discovery-service.git
synced 2025-03-14 09:55:08 +00:00
chore: improve state logging
Log cluster creation and removal. Signed-off-by: Alexey Palazhchenko <alexey.palazhchenko@talos-systems.com>
This commit is contained in:
@ -70,6 +70,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
zap.ReplaceGlobals(logger)
|
||||
zap.RedirectStdLog(logger)
|
||||
|
||||
if err = signalHandler(context.Background(), logger, run); err != nil {
|
||||
logger.Error("service failed", zap.Error(err))
|
||||
|
||||
@ -123,7 +126,7 @@ func run(ctx context.Context, logger *zap.Logger) error {
|
||||
),
|
||||
}
|
||||
|
||||
state := state.NewState()
|
||||
state := state.NewState(logger)
|
||||
prom.MustRegister(state)
|
||||
|
||||
srv := server.NewClusterServer(state, ctx.Done())
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
// State keeps the discovery service state.
|
||||
type State struct { //nolint:govet
|
||||
clusters sync.Map
|
||||
logger *zap.Logger
|
||||
|
||||
mClustersDesc *prom.Desc
|
||||
mAffiliatesDesc *prom.Desc
|
||||
@ -28,8 +29,9 @@ type State struct { //nolint:govet
|
||||
}
|
||||
|
||||
// NewState create new instance of State.
|
||||
func NewState() *State {
|
||||
func NewState(logger *zap.Logger) *State {
|
||||
return &State{
|
||||
logger: logger,
|
||||
mClustersDesc: prom.NewDesc(
|
||||
"discovery_state_clusters",
|
||||
"The current number of clusters in the state.",
|
||||
@ -65,13 +67,16 @@ func NewState() *State {
|
||||
}
|
||||
}
|
||||
|
||||
// GetCluster returns (or creates) new cluster by ID.
|
||||
// GetCluster returns cluster by ID, creating it if needed.
|
||||
func (state *State) GetCluster(id string) *Cluster {
|
||||
if v, ok := state.clusters.Load(id); ok {
|
||||
return v.(*Cluster)
|
||||
}
|
||||
|
||||
v, _ := state.clusters.LoadOrStore(id, NewCluster(id))
|
||||
v, loaded := state.clusters.LoadOrStore(id, NewCluster(id))
|
||||
if !loaded {
|
||||
state.logger.Debug("cluster created", zap.String("cluster_id", id))
|
||||
}
|
||||
|
||||
return v.(*Cluster)
|
||||
}
|
||||
@ -84,6 +89,7 @@ func (state *State) GarbageCollect(now time.Time) (removedClusters, removedAffil
|
||||
removedAffiliates += ra
|
||||
if empty {
|
||||
state.clusters.Delete(key)
|
||||
state.logger.Debug("cluster removed", zap.String("cluster_id", key.(string)))
|
||||
removedClusters++
|
||||
}
|
||||
|
||||
@ -104,15 +110,23 @@ func (state *State) RunGC(ctx context.Context, logger *zap.Logger, interval time
|
||||
|
||||
for ctx.Err() == nil {
|
||||
removedClusters, removedAffiliates := state.GarbageCollect(time.Now())
|
||||
clusters, affiliates, endpoints, subscriptions := state.stats()
|
||||
|
||||
logFunc := logger.Debug
|
||||
if removedClusters > 0 || removedAffiliates > 0 {
|
||||
logger.Info(
|
||||
"garbage collection run",
|
||||
zap.Int("removed_clusters", removedClusters),
|
||||
zap.Int("removed_affiliates", removedAffiliates),
|
||||
)
|
||||
logFunc = logger.Info
|
||||
}
|
||||
|
||||
logFunc(
|
||||
"garbage collection run",
|
||||
zap.Int("removed_clusters", removedClusters),
|
||||
zap.Int("removed_affiliates", removedAffiliates),
|
||||
zap.Int("current_clusters", clusters),
|
||||
zap.Int("current_affiliates", affiliates),
|
||||
zap.Int("current_endpoints", endpoints),
|
||||
zap.Int("current_subscriptions", subscriptions),
|
||||
)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-ticker.C:
|
||||
@ -120,15 +134,7 @@ func (state *State) RunGC(ctx context.Context, logger *zap.Logger, interval time
|
||||
}
|
||||
}
|
||||
|
||||
// Describe implements prom.Collector interface.
|
||||
func (state *State) Describe(ch chan<- *prom.Desc) {
|
||||
prom.DescribeByCollect(state, ch)
|
||||
}
|
||||
|
||||
// Collect implements prom.Collector interface.
|
||||
func (state *State) Collect(ch chan<- prom.Metric) {
|
||||
var clusters, affiliates, endpoints, subscriptions int
|
||||
|
||||
func (state *State) stats() (clusters, affiliates, endpoints, subscriptions int) {
|
||||
state.clusters.Range(func(key, value interface{}) bool {
|
||||
clusters++
|
||||
|
||||
@ -142,6 +148,18 @@ func (state *State) Collect(ch chan<- prom.Metric) {
|
||||
return true
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Describe implements prom.Collector interface.
|
||||
func (state *State) Describe(ch chan<- *prom.Desc) {
|
||||
prom.DescribeByCollect(state, ch)
|
||||
}
|
||||
|
||||
// Collect implements prom.Collector interface.
|
||||
func (state *State) Collect(ch chan<- prom.Metric) {
|
||||
clusters, affiliates, endpoints, subscriptions := state.stats()
|
||||
|
||||
ch <- prom.MustNewConstMetric(state.mClustersDesc, prom.GaugeValue, float64(clusters))
|
||||
ch <- prom.MustNewConstMetric(state.mAffiliatesDesc, prom.GaugeValue, float64(affiliates))
|
||||
ch <- prom.MustNewConstMetric(state.mEndpointsDesc, prom.GaugeValue, float64(endpoints))
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"github.com/talos-systems/discovery-service/internal/state"
|
||||
)
|
||||
@ -27,7 +28,7 @@ func checkMetrics(t *testing.T, c prom.Collector) {
|
||||
func TestState(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
st := state.NewState()
|
||||
st := state.NewState(zaptest.NewLogger(t))
|
||||
|
||||
// Check metrics before and after the test
|
||||
// to ensure that collector does not switch from being unchecked to checked and invalid.
|
||||
|
@ -41,7 +41,7 @@ func setupServer(t *testing.T) (address string) {
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
|
||||
state := state.NewState()
|
||||
state := state.NewState(logger)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
@ -51,8 +52,8 @@ func NewClusterServer(state *state.State, stopCh <-chan struct{}) *ClusterServer
|
||||
}
|
||||
|
||||
// NewTestClusterServer builds cluster server for testing code.
|
||||
func NewTestClusterServer() *ClusterServer {
|
||||
return NewClusterServer(state.NewState(), nil)
|
||||
func NewTestClusterServer(logger *zap.Logger) *ClusterServer {
|
||||
return NewClusterServer(state.NewState(logger), nil)
|
||||
}
|
||||
|
||||
// Hello implements cluster API.
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
@ -40,7 +41,7 @@ func checkMetrics(t *testing.T, c prom.Collector) {
|
||||
func setupServer(t *testing.T) (address string) {
|
||||
t.Helper()
|
||||
|
||||
srv := server.NewTestClusterServer()
|
||||
srv := server.NewTestClusterServer(zaptest.NewLogger(t))
|
||||
|
||||
// Check metrics before and after the test
|
||||
// to ensure that collector does not switch from being unchecked to checked and invalid.
|
||||
|
Reference in New Issue
Block a user