diff --git a/cmd/default-evaluator/main.go b/cmd/default-evaluator/main.go index 391d5c21..acb7ab27 100644 --- a/cmd/default-evaluator/main.go +++ b/cmd/default-evaluator/main.go @@ -15,11 +15,10 @@ package main import ( - "open-match.dev/open-match/internal/app/evaluator" "open-match.dev/open-match/internal/app/evaluator/defaulteval" "open-match.dev/open-match/internal/appmain" ) func main() { - appmain.RunApplication("evaluator", evaluator.BindServiceFor(defaulteval.Evaluate)) + appmain.RunApplication("evaluator", defaulteval.BindService) } diff --git a/internal/app/backend/backend.go b/internal/app/backend/backend.go index 3da96a86..1bd442f9 100644 --- a/internal/app/backend/backend.go +++ b/internal/app/backend/backend.go @@ -15,13 +15,54 @@ package backend import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" "google.golang.org/grpc" "open-match.dev/open-match/internal/appmain" "open-match.dev/open-match/internal/rpc" "open-match.dev/open-match/internal/statestore" + "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) +var ( + totalBytesPerMatch = stats.Int64("open-match.dev/backend/total_bytes_per_match", "Total bytes per match", stats.UnitBytes) + ticketsPerMatch = stats.Int64("open-match.dev/backend/tickets_per_match", "Number of tickets per match", stats.UnitDimensionless) + ticketsReleased = stats.Int64("open-match.dev/backend/tickets_released", "Number of tickets released per request", stats.UnitDimensionless) + ticketsAssigned = stats.Int64("open-match.dev/backend/tickets_assigned", "Number of tickets assigned per request", stats.UnitDimensionless) + + totalMatchesView = &view.View{ + Measure: totalBytesPerMatch, + Name: "open-match.dev/backend/total_matches", + Description: "Total number of matches", + Aggregation: view.Count(), + } + totalBytesPerMatchView = &view.View{ + Measure: totalBytesPerMatch, + Name: "open-match.dev/backend/total_bytes_per_match", + Description: "Total bytes per match", + Aggregation: telemetry.DefaultBytesDistribution, + } + ticketsPerMatchView = &view.View{ + Measure: ticketsPerMatch, + Name: "open-match.dev/backend/tickets_per_match", + Description: "Tickets per ticket", + Aggregation: telemetry.DefaultCountDistribution, + } + ticketsAssignedView = &view.View{ + Measure: ticketsAssigned, + Name: "open-match.dev/backend/tickets_assigned", + Description: "Number of tickets assigned per request", + Aggregation: view.Sum(), + } + ticketsReleasedView = &view.View{ + Measure: ticketsReleased, + Name: "open-match.dev/backend/tickets_released", + Description: "Number of tickets released per request", + Aggregation: view.Sum(), + } +) + // BindService creates the backend service and binds it to the serving harness. func BindService(p *appmain.Params, b *appmain.Bindings) error { service := &backendService{ @@ -34,6 +75,12 @@ func BindService(p *appmain.Params, b *appmain.Bindings) error { b.AddHandleFunc(func(s *grpc.Server) { pb.RegisterBackendServiceServer(s, service) }, pb.RegisterBackendServiceHandlerFromEndpoint) - + b.RegisterViews( + totalMatchesView, + totalBytesPerMatchView, + ticketsPerMatchView, + ticketsAssignedView, + ticketsReleasedView, + ) return nil } diff --git a/internal/app/backend/backend_service.go b/internal/app/backend/backend_service.go index a6eb0ec3..18bf970c 100644 --- a/internal/app/backend/backend_service.go +++ b/internal/app/backend/backend_service.go @@ -23,7 +23,10 @@ import ( "strings" "sync" + "go.opencensus.io/stats" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -32,7 +35,6 @@ import ( "open-match.dev/open-match/internal/ipb" "open-match.dev/open-match/internal/rpc" "open-match.dev/open-match/internal/statestore" - "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) @@ -49,10 +51,6 @@ var ( "app": "openmatch", "component": "app.backend", }) - mMatchesFetched = telemetry.Counter("backend/matches_fetched", "matches fetched") - mMatchesSentToEvaluation = telemetry.Counter("backend/matches_sent_to_evaluation", "matches sent to evaluation") - mTicketsAssigned = telemetry.Counter("backend/tickets_assigned", "tickets assigned") - mTicketsReleased = telemetry.Counter("backend/tickets_released", "tickets released") ) // FetchMatches triggers a MatchFunction with the specified MatchProfiles, while each MatchProfile @@ -131,7 +129,6 @@ sendProposals: if loaded { return fmt.Errorf("MatchMakingFunction returned same match_id twice: \"%s\"", p.GetMatchId()) } - telemetry.RecordUnitMeasurement(ctx, mMatchesSentToEvaluation) err := syncStream.Send(&ipb.SynchronizeRequest{Proposal: p}) if err != nil { return fmt.Errorf("error sending proposal to synchronizer: %w", err) @@ -168,9 +165,14 @@ func synchronizeRecv(ctx context.Context, syncStream synchronizerStream, m *sync cancelMmfs() } - if match, ok := m.Load(resp.GetMatchId()); ok { - telemetry.RecordUnitMeasurement(ctx, mMatchesFetched) - err = stream.Send(&pb.FetchMatchesResponse{Match: match.(*pb.Match)}) + if v, ok := m.Load(resp.GetMatchId()); ok { + match, ok := v.(*pb.Match) + if !ok { + return fmt.Errorf("error casting sync map value into *pb.Match: %w", err) + } + stats.Record(ctx, totalBytesPerMatch.M(int64(proto.Size(match)))) + stats.Record(ctx, ticketsPerMatch.M(int64(len(match.GetTickets())))) + err = stream.Send(&pb.FetchMatchesResponse{Match: match}) if err != nil { return fmt.Errorf("error sending match to caller of backend: %w", err) } @@ -300,7 +302,7 @@ func (s *backendService) ReleaseTickets(ctx context.Context, req *pb.ReleaseTick return nil, err } - telemetry.RecordNUnitMeasurement(ctx, mTicketsReleased, int64(len(req.TicketIds))) + stats.Record(ctx, ticketsReleased.M(int64(len(req.TicketIds)))) return &pb.ReleaseTicketsResponse{}, nil } @@ -317,7 +319,7 @@ func (s *backendService) AssignTickets(ctx context.Context, req *pb.AssignTicket numIds += len(ag.TicketIds) } - telemetry.RecordNUnitMeasurement(ctx, mTicketsAssigned, int64(numIds)) + stats.Record(ctx, ticketsAssigned.M(int64(numIds))) return resp, nil } diff --git a/internal/app/evaluator/defaulteval/evaluator.go b/internal/app/evaluator/defaulteval/evaluator.go index fc88cc11..df999fe8 100644 --- a/internal/app/evaluator/defaulteval/evaluator.go +++ b/internal/app/evaluator/defaulteval/evaluator.go @@ -20,8 +20,13 @@ import ( "math" "sort" + "go.opencensus.io/stats" + "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" + "go.opencensus.io/stats/view" + "open-match.dev/open-match/internal/app/evaluator" + "open-match.dev/open-match/internal/appmain" "open-match.dev/open-match/pkg/pb" ) @@ -30,6 +35,14 @@ var ( "app": "evaluator", "component": "evaluator.default", }) + + collidedMatchesPerEvaluate = stats.Int64("open-match.dev/defaulteval/collided_matches_per_call", "Number of collided matches per default evaluator call", stats.UnitDimensionless) + collidedMatchesPerEvaluateView = &view.View{ + Measure: collidedMatchesPerEvaluate, + Name: "open-match.dev/defaulteval/collided_matches_per_call", + Description: "Number of collided matches per default evaluator call", + Aggregation: view.Sum(), + } ) type matchInp struct { @@ -37,9 +50,18 @@ type matchInp struct { inp *pb.DefaultEvaluationCriteria } -// Evaluate sorts the matches by DefaultEvaluationCriteria.Score (optional), +// BindService define the initialization steps for this evaluator +func BindService(p *appmain.Params, b *appmain.Bindings) error { + if err := evaluator.BindServiceFor(evaluate)(p, b); err != nil { + return err + } + b.RegisterViews(collidedMatchesPerEvaluateView) + return nil +} + +// evaluate sorts the matches by DefaultEvaluationCriteria.Score (optional), // then returns matches which don't collide with previously returned matches. -func Evaluate(ctx context.Context, in <-chan *pb.Match, out chan<- string) error { +func evaluate(ctx context.Context, in <-chan *pb.Match, out chan<- string) error { matches := make([]*matchInp, 0) nilEvlautionInputs := 0 @@ -84,6 +106,8 @@ func Evaluate(ctx context.Context, in <-chan *pb.Match, out chan<- string) error d.maybeAdd(m) } + stats.Record(context.Background(), collidedMatchesPerEvaluate.M(int64(len(matches)-len(d.resultIDs)))) + for _, id := range d.resultIDs { out <- id } diff --git a/internal/app/evaluator/defaulteval/evaluator_test.go b/internal/app/evaluator/defaulteval/evaluator_test.go index b01c8e64..64d5bb9c 100644 --- a/internal/app/evaluator/defaulteval/evaluator_test.go +++ b/internal/app/evaluator/defaulteval/evaluator_test.go @@ -121,7 +121,7 @@ func TestEvaluate(t *testing.T) { } close(in) - err := Evaluate(context.Background(), in, out) + err := evaluate(context.Background(), in, out) assert.Nil(t, err) gotMatchIDs := []string{} diff --git a/internal/app/evaluator/evaluator.go b/internal/app/evaluator/evaluator.go index daf17592..75bc94d9 100644 --- a/internal/app/evaluator/evaluator.go +++ b/internal/app/evaluator/evaluator.go @@ -16,18 +16,42 @@ package evaluator import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" "google.golang.org/grpc" "open-match.dev/open-match/internal/appmain" + "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) +var ( + matchesPerEvaluateRequest = stats.Int64("open-match.dev/evaluator/matches_per_request", "Number of matches sent to the evaluator per request", stats.UnitDimensionless) + matchesPerEvaluateResponse = stats.Int64("open-match.dev/evaluator/matches_per_response", "Number of matches returned by the evaluator per response", stats.UnitDimensionless) + + matchesPerEvaluateRequestView = &view.View{ + Measure: matchesPerEvaluateRequest, + Name: "open-match.dev/evaluator/matches_per_request", + Description: "Number of matches sent to the evaluator per request", + Aggregation: telemetry.DefaultCountDistribution, + } + matchesPerEvaluateResponseView = &view.View{ + Measure: matchesPerEvaluateResponse, + Name: "open-match.dev/evaluator/matches_per_response", + Description: "Number of matches sent to the evaluator per response", + Aggregation: telemetry.DefaultCountDistribution, + } +) + // BindServiceFor creates the evaluator service and binds it to the serving harness. func BindServiceFor(eval Evaluator) appmain.Bind { return func(p *appmain.Params, b *appmain.Bindings) error { b.AddHandleFunc(func(s *grpc.Server) { - pb.RegisterEvaluatorServer(s, &evaluatorService{evaluate: eval}) + pb.RegisterEvaluatorServer(s, &evaluatorService{eval}) }, pb.RegisterEvaluatorHandlerFromEndpoint) - + b.RegisterViews( + matchesPerEvaluateRequestView, + matchesPerEvaluateResponseView, + ) return nil } } diff --git a/internal/app/evaluator/evaluator_service.go b/internal/app/evaluator/evaluator_service.go index a5736c6e..f59eb07a 100644 --- a/internal/app/evaluator/evaluator_service.go +++ b/internal/app/evaluator/evaluator_service.go @@ -20,6 +20,7 @@ import ( "io" "github.com/sirupsen/logrus" + "go.opencensus.io/stats" "golang.org/x/sync/errgroup" "open-match.dev/open-match/pkg/pb" ) @@ -77,12 +78,16 @@ func (s *evaluatorService) Evaluate(stream pb.Evaluator_EvaluateServer) error { for range out { } }() + + count := 0 for id := range out { err := stream.Send(&pb.EvaluateResponse{MatchId: id}) if err != nil { return err } + count++ } + stats.Record(ctx, matchesPerEvaluateResponse.M(int64(count))) return nil }) diff --git a/internal/app/frontend/frontend.go b/internal/app/frontend/frontend.go index 64f79f02..a1159486 100644 --- a/internal/app/frontend/frontend.go +++ b/internal/app/frontend/frontend.go @@ -15,12 +15,33 @@ package frontend import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" "google.golang.org/grpc" "open-match.dev/open-match/internal/appmain" "open-match.dev/open-match/internal/statestore" + "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) +var ( + totalBytesPerTicket = stats.Int64("open-match.dev/frontend/total_bytes_per_ticket", "Total bytes per ticket", stats.UnitBytes) + searchFieldsPerTicket = stats.Int64("open-match.dev/frontend/searchfields_per_ticket", "Searchfields per ticket", stats.UnitDimensionless) + + totalBytesPerTicketView = &view.View{ + Measure: totalBytesPerTicket, + Name: "open-match.dev/frontend/total_bytes_per_ticket", + Description: "Total bytes per ticket", + Aggregation: telemetry.DefaultBytesDistribution, + } + searchFieldsPerTicketView = &view.View{ + Measure: searchFieldsPerTicket, + Name: "open-match.dev/frontend/searchfields_per_ticket", + Description: "SearchFields per ticket", + Aggregation: telemetry.DefaultCountDistribution, + } +) + // BindService creates the frontend service and binds it to the serving harness. func BindService(p *appmain.Params, b *appmain.Bindings) error { service := &frontendService{ @@ -32,6 +53,9 @@ func BindService(p *appmain.Params, b *appmain.Bindings) error { b.AddHandleFunc(func(s *grpc.Server) { pb.RegisterFrontendServiceServer(s, service) }, pb.RegisterFrontendServiceHandlerFromEndpoint) - + b.RegisterViews( + totalBytesPerTicketView, + searchFieldsPerTicketView, + ) return nil } diff --git a/internal/app/frontend/frontend_service.go b/internal/app/frontend/frontend_service.go index dd8d65a8..fc961c4d 100644 --- a/internal/app/frontend/frontend_service.go +++ b/internal/app/frontend/frontend_service.go @@ -22,12 +22,12 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/rs/xid" "github.com/sirupsen/logrus" + "go.opencensus.io/stats" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "open-match.dev/open-match/internal/config" "open-match.dev/open-match/internal/statestore" - "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) @@ -43,10 +43,6 @@ var ( "app": "openmatch", "component": "app.frontend", }) - mTicketsCreated = telemetry.Counter("frontend/tickets_created", "tickets created") - mTicketsDeleted = telemetry.Counter("frontend/tickets_deleted", "tickets deleted") - mTicketsRetrieved = telemetry.Counter("frontend/tickets_retrieved", "tickets retrieved") - mTicketAssignmentsRetrieved = telemetry.Counter("frontend/tickets_assignments_retrieved", "ticket assignments retrieved") ) // CreateTicket assigns an unique TicketId to the input Ticket and record it in state storage. @@ -77,6 +73,14 @@ func doCreateTicket(ctx context.Context, req *pb.CreateTicketRequest, store stat ticket.Id = xid.New().String() ticket.CreateTime = ptypes.TimestampNow() + + sfCount := 0 + sfCount += len(ticket.GetSearchFields().GetDoubleArgs()) + sfCount += len(ticket.GetSearchFields().GetStringArgs()) + sfCount += len(ticket.GetSearchFields().GetTags()) + stats.Record(ctx, searchFieldsPerTicket.M(int64(sfCount))) + stats.Record(ctx, totalBytesPerTicket.M(int64(proto.Size(ticket)))) + err := store.CreateTicket(ctx, ticket) if err != nil { logger.WithFields(logrus.Fields{ @@ -95,7 +99,6 @@ func doCreateTicket(ctx context.Context, req *pb.CreateTicketRequest, store stat return nil, err } - telemetry.RecordUnitMeasurement(ctx, mTicketsCreated) return ticket, nil } @@ -108,7 +111,6 @@ func (s *frontendService) DeleteTicket(ctx context.Context, req *pb.DeleteTicket if err != nil { return nil, err } - telemetry.RecordUnitMeasurement(ctx, mTicketsDeleted) return &empty.Empty{}, nil } @@ -150,7 +152,6 @@ func doDeleteTicket(ctx context.Context, id string, store statestore.Service) er // GetTicket get the Ticket associated with the specified TicketId. func (s *frontendService) GetTicket(ctx context.Context, req *pb.GetTicketRequest) (*pb.Ticket, error) { - telemetry.RecordUnitMeasurement(ctx, mTicketsRetrieved) return doGetTickets(ctx, req.GetTicketId(), s.store) } @@ -177,7 +178,6 @@ func (s *frontendService) WatchAssignments(req *pb.WatchAssignmentsRequest, stre return ctx.Err() default: sender := func(assignment *pb.Assignment) error { - telemetry.RecordUnitMeasurement(ctx, mTicketAssignmentsRetrieved) return stream.Send(&pb.WatchAssignmentsResponse{Assignment: assignment}) } return doWatchAssignments(ctx, req.GetTicketId(), sender, s.store) diff --git a/internal/app/query/query.go b/internal/app/query/query.go index 3b887d1b..41b96fce 100644 --- a/internal/app/query/query.go +++ b/internal/app/query/query.go @@ -15,11 +15,59 @@ package query import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" "google.golang.org/grpc" "open-match.dev/open-match/internal/appmain" + "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) +var ( + ticketsPerQuery = stats.Int64("open-match.dev/query/tickets_per_query", "Number of tickets per query", stats.UnitDimensionless) + cacheTotalItems = stats.Int64("open-match.dev/query/total_cache_items", "Total number of tickets query service cached", stats.UnitDimensionless) + cacheFetchedItems = stats.Int64("open-match.dev/query/fetched_items", "Number of fetched items in total", stats.UnitDimensionless) + cacheWaitingQueries = stats.Int64("open-match.dev/query/waiting_queries", "Number of waiting queries in the last update", stats.UnitDimensionless) + cacheUpdateLatency = stats.Float64("open-match.dev/query/update_latency", "Time elapsed of each query cache update", stats.UnitMilliseconds) + + ticketsPerQueryView = &view.View{ + Measure: ticketsPerQuery, + Name: "open-match.dev/query/tickets_per_query", + Description: "Tickets per query", + Aggregation: telemetry.DefaultCountDistribution, + } + cacheTotalItemsView = &view.View{ + Measure: cacheTotalItems, + Name: "open-match.dev/query/total_cached_items", + Description: "Total number of cached tickets", + Aggregation: view.LastValue(), + } + cacheFetchedItemsView = &view.View{ + Measure: cacheFetchedItems, + Name: "open-match.dev/query/total_fetched_items", + Description: "Total number of fetched tickets", + Aggregation: view.Sum(), + } + cacheUpdateView = &view.View{ + Measure: cacheWaitingQueries, + Name: "open-match.dev/query/cache_updates", + Description: "Number of query cache updates in total", + Aggregation: view.Count(), + } + cacheWaitingQueriesView = &view.View{ + Measure: cacheWaitingQueries, + Name: "open-match.dev/query/waiting_requests", + Description: "Number of waiting requests in total", + Aggregation: telemetry.DefaultCountDistribution, + } + cacheUpdateLatencyView = &view.View{ + Measure: cacheUpdateLatency, + Name: "open-match.dev/query/update_latency", + Description: "Time elapsed of each query cache update", + Aggregation: telemetry.DefaultMillisecondsDistribution, + } +) + // BindService creates the query service and binds it to the serving harness. func BindService(p *appmain.Params, b *appmain.Bindings) error { service := &queryService{ @@ -30,6 +78,13 @@ func BindService(p *appmain.Params, b *appmain.Bindings) error { b.AddHandleFunc(func(s *grpc.Server) { pb.RegisterQueryServiceServer(s, service) }, pb.RegisterQueryServiceHandlerFromEndpoint) - + b.RegisterViews( + ticketsPerQueryView, + cacheTotalItemsView, + cacheUpdateView, + cacheFetchedItemsView, + cacheWaitingQueriesView, + cacheUpdateLatencyView, + ) return nil } diff --git a/internal/app/query/query_service.go b/internal/app/query/query_service.go index 9701ba99..e0cc413d 100644 --- a/internal/app/query/query_service.go +++ b/internal/app/query/query_service.go @@ -17,6 +17,9 @@ package query import ( "context" "sync" + "time" + + "go.opencensus.io/stats" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -44,6 +47,7 @@ type queryService struct { } func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer pb.QueryService_QueryTicketsServer) error { + ctx := responseServer.Context() pool := req.GetPool() if pool == nil { return status.Error(codes.InvalidArgument, ".pool is required") @@ -55,7 +59,7 @@ func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer } var results []*pb.Ticket - err = s.tc.request(responseServer.Context(), func(tickets map[string]*pb.Ticket) { + err = s.tc.request(ctx, func(tickets map[string]*pb.Ticket) { for _, ticket := range tickets { if pf.In(ticket) { results = append(results, ticket) @@ -66,6 +70,7 @@ func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer logger.WithError(err).Error("Failed to run request.") return err } + stats.Record(ctx, ticketsPerQuery.M(int64(len(results)))) pSize := getPageSize(s.cfg) for start := 0; start < len(results); start += pSize { @@ -86,6 +91,7 @@ func (s *queryService) QueryTickets(req *pb.QueryTicketsRequest, responseServer } func (s *queryService) QueryTicketIds(req *pb.QueryTicketIdsRequest, responseServer pb.QueryService_QueryTicketIdsServer) error { + ctx := responseServer.Context() pool := req.GetPool() if pool == nil { return status.Error(codes.InvalidArgument, ".pool is required") @@ -97,7 +103,7 @@ func (s *queryService) QueryTicketIds(req *pb.QueryTicketIdsRequest, responseSer } var results []string - err = s.tc.request(responseServer.Context(), func(tickets map[string]*pb.Ticket) { + err = s.tc.request(ctx, func(tickets map[string]*pb.Ticket) { for id, ticket := range tickets { if pf.In(ticket) { results = append(results, id) @@ -108,6 +114,7 @@ func (s *queryService) QueryTicketIds(req *pb.QueryTicketIdsRequest, responseSer logger.WithError(err).Error("Failed to run request.") return err } + stats.Record(ctx, ticketsPerQuery.M(int64(len(results)))) pSize := getPageSize(s.cfg) for start := 0; start < len(results); start += pSize { @@ -254,6 +261,7 @@ collectAllWaiting: } tc.update() + stats.Record(context.Background(), cacheWaitingQueries.M(int64(len(reqs)))) // Send WaitGroup to query calls, letting them run their query on the ticket // cache. @@ -271,6 +279,7 @@ collectAllWaiting: } func (tc *ticketCache) update() { + st := time.Now() previousCount := len(tc.tickets) currentAll, err := tc.store.GetIndexedIDSet(context.Background()) @@ -305,6 +314,10 @@ func (tc *ticketCache) update() { tc.tickets[t.Id] = t } + stats.Record(context.Background(), cacheTotalItems.M(int64(previousCount))) + stats.Record(context.Background(), cacheFetchedItems.M(int64(len(toFetch)))) + stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(st))/float64(time.Millisecond))) + logger.Debugf("Ticket Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(tc.tickets)) tc.err = nil } diff --git a/internal/app/synchronizer/synchronizer.go b/internal/app/synchronizer/synchronizer.go index 97f882d8..1793e92e 100644 --- a/internal/app/synchronizer/synchronizer.go +++ b/internal/app/synchronizer/synchronizer.go @@ -15,10 +15,38 @@ package synchronizer import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" "google.golang.org/grpc" "open-match.dev/open-match/internal/appmain" "open-match.dev/open-match/internal/ipb" "open-match.dev/open-match/internal/statestore" + "open-match.dev/open-match/internal/telemetry" +) + +var ( + iterationLatency = stats.Float64("open-match.dev/synchronizer/iteration_latency", "Time elapsed of each synchronizer iteration", stats.UnitMilliseconds) + registrationWaitTime = stats.Float64("open-match.dev/synchronizer/registration_wait_time", "Time elapsed of registration wait time", stats.UnitMilliseconds) + registrationMMFDoneTime = stats.Float64("open-match.dev/synchronizer/registration_mmf_done_time", "Time elapsed wasted in registration window with done MMFs", stats.UnitMilliseconds) + + iterationLatencyView = &view.View{ + Measure: iterationLatency, + Name: "open-match.dev/synchronizer/iteration_latency", + Description: "Time elapsed of each synchronizer iteration", + Aggregation: telemetry.DefaultMillisecondsDistribution, + } + registrationWaitTimeView = &view.View{ + Measure: registrationWaitTime, + Name: "open-match.dev/synchronizer/registration_wait_time", + Description: "Time elapsed of registration wait time", + Aggregation: telemetry.DefaultMillisecondsDistribution, + } + registrationMMFDoneTimeView = &view.View{ + Measure: registrationMMFDoneTime, + Name: "open-match.dev/synchronizer/registration_mmf_done_time", + Description: "Time elapsed wasted in registration window with done MMFs", + Aggregation: telemetry.DefaultMillisecondsDistribution, + } ) // BindService creates the synchronizer service and binds it to the serving harness. @@ -29,6 +57,10 @@ func BindService(p *appmain.Params, b *appmain.Bindings) error { b.AddHandleFunc(func(s *grpc.Server) { ipb.RegisterSynchronizerServer(s, service) }, nil) - + b.RegisterViews( + iterationLatencyView, + registrationWaitTimeView, + registrationMMFDoneTimeView, + ) return nil } diff --git a/internal/app/synchronizer/synchronizer_service.go b/internal/app/synchronizer/synchronizer_service.go index 28f54951..9ac2491f 100644 --- a/internal/app/synchronizer/synchronizer_service.go +++ b/internal/app/synchronizer/synchronizer_service.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "go.opencensus.io/stats" + "github.com/sirupsen/logrus" "open-match.dev/open-match/internal/config" "open-match.dev/open-match/internal/ipb" @@ -185,6 +187,9 @@ func (s synchronizerService) register(ctx context.Context) *registration { resp: make(chan *registration), ctx: ctx, } + + st := time.Now() + defer stats.Record(ctx, registrationWaitTime.M(float64(time.Since(st))/float64(time.Millisecond))) for { select { case s.synchronizeRegistration <- req: @@ -202,6 +207,7 @@ func (s synchronizerService) register(ctx context.Context) *registration { /////////////////////////////////////// func (s *synchronizerService) runCycle() { + cst := time.Now() /////////////////////////////////////// Initialize cycle ctx, cancel := withCancelCause(context.Background()) @@ -240,6 +246,7 @@ func (s *synchronizerService) runCycle() { }() /////////////////////////////////////// Run Registration Period + rst := time.Now() closeRegistration := time.After(s.registrationInterval()) Registration: for { @@ -272,6 +279,7 @@ Registration: go func() { allM1cSent.Wait() m1c.cutoff() + stats.Record(ctx, registrationMMFDoneTime.M(float64((s.registrationInterval()-time.Since(rst))/time.Millisecond))) }() cancelProposalCollection := time.AfterFunc(s.proposalCollectionInterval(), func() { @@ -281,6 +289,7 @@ Registration: } }) <-closedOnCycleEnd + stats.Record(ctx, iterationLatency.M(float64(time.Since(cst)/time.Millisecond))) // Clean up in case it was never needed. cancelProposalCollection.Stop() diff --git a/internal/appmain/appmain.go b/internal/appmain/appmain.go index 89e363ce..fb00f963 100644 --- a/internal/appmain/appmain.go +++ b/internal/appmain/appmain.go @@ -23,6 +23,8 @@ import ( "os/signal" "syscall" + "go.opencensus.io/stats/view" + "github.com/sirupsen/logrus" "open-match.dev/open-match/internal/config" "open-match.dev/open-match/internal/logging" @@ -83,8 +85,9 @@ func (p *Params) ServiceName() string { // Bindings allows applications to bind various functions to the running servers. type Bindings struct { - sp *rpc.ServerParams - a *App + sp *rpc.ServerParams + a *App + firstErr error } // AddHealthCheckFunc allows an application to check if it is healthy, and @@ -93,6 +96,20 @@ func (b *Bindings) AddHealthCheckFunc(f func(context.Context) error) { b.sp.AddHealthCheckFunc(f) } +// RegisterViews begins collecting data for the given views. +func (b *Bindings) RegisterViews(v ...*view.View) { + if err := view.Register(v...); err != nil { + if b.firstErr == nil { + b.firstErr = err + } + return + } + + b.AddCloser(func() { + view.Unregister(v...) + }) +} + // AddHandleFunc adds a protobuf service to the grpc server which is starting. func (b *Bindings) AddHandleFunc(handlerFunc rpc.GrpcHandler, grpcProxyHandler rpc.GrpcProxyHandler) { b.sp.AddHandleFunc(handlerFunc, grpcProxyHandler) @@ -169,6 +186,11 @@ func NewApplication(serviceName string, bindService Bind, getCfg func() (config. _ = surpressedErr return nil, err } + if b.firstErr != nil { + surpressedErr := a.Stop() // Don't care about additional errors stopping. + _ = surpressedErr + return nil, b.firstErr + } s := &rpc.Server{} err = s.Start(sp) diff --git a/internal/statestore/instrumented.go b/internal/statestore/instrumented.go index 13c73b04..96c7447c 100644 --- a/internal/statestore/instrumented.go +++ b/internal/statestore/instrumented.go @@ -18,24 +18,9 @@ import ( "context" "go.opencensus.io/trace" - "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) -var ( - mStateStoreCreateTicketCount = telemetry.Counter("statestore/createticketcount", "number of tickets created") - mStateStoreGetTicketCount = telemetry.Counter("statestore/getticketcount", "number of tickets retrieved") - mStateStoreDeleteTicketCount = telemetry.Counter("statestore/deleteticketcount", "number of tickets deleted") - mStateStoreIndexTicketCount = telemetry.Counter("statestore/indexticketcount", "number of tickets indexed") - mStateStoreDeindexTicketCount = telemetry.Counter("statestore/deindexticketcount", "number of tickets deindexed") - mStateStoreGetTicketsCount = telemetry.Counter("statestore/getticketscount", "number of bulk ticket retrievals") - mStateStoreGetIndexedIDSetCount = telemetry.Counter("statestore/getindexedidsetcount", "number of bulk indexed id retrievals") - mStateStoreUpdateAssignmentsCount = telemetry.Counter("statestore/updateassignmentcount", "number of tickets assigned") - mStateStoreGetAssignmentsCount = telemetry.Counter("statestore/getassignmentscount", "number of ticket assigned retrieved") - mStateStoreAddTicketsToIgnoreListCount = telemetry.Counter("statestore/addticketstoignorelistcount", "number of tickets moved to ignore list") - mStateStoreDeleteTicketFromIgnoreListCount = telemetry.Counter("statestore/deleteticketfromignorelistcount", "number of tickets removed from ignore list") -) - // instrumentedService is a wrapper for a statestore service that provides instrumentation (metrics and tracing) of the database. type instrumentedService struct { s Service @@ -56,7 +41,6 @@ func (is *instrumentedService) HealthCheck(ctx context.Context) error { func (is *instrumentedService) CreateTicket(ctx context.Context, ticket *pb.Ticket) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.CreateTicket") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreCreateTicketCount) return is.s.CreateTicket(ctx, ticket) } @@ -64,7 +48,6 @@ func (is *instrumentedService) CreateTicket(ctx context.Context, ticket *pb.Tick func (is *instrumentedService) GetTicket(ctx context.Context, id string) (*pb.Ticket, error) { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetTicket") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreGetTicketCount) return is.s.GetTicket(ctx, id) } @@ -72,7 +55,6 @@ func (is *instrumentedService) GetTicket(ctx context.Context, id string) (*pb.Ti func (is *instrumentedService) DeleteTicket(ctx context.Context, id string) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.DeleteTicket") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreDeleteTicketCount) return is.s.DeleteTicket(ctx, id) } @@ -80,7 +62,6 @@ func (is *instrumentedService) DeleteTicket(ctx context.Context, id string) erro func (is *instrumentedService) IndexTicket(ctx context.Context, ticket *pb.Ticket) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.IndexTicket") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreIndexTicketCount) return is.s.IndexTicket(ctx, ticket) } @@ -88,7 +69,6 @@ func (is *instrumentedService) IndexTicket(ctx context.Context, ticket *pb.Ticke func (is *instrumentedService) DeindexTicket(ctx context.Context, id string) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.DeindexTicket") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreDeindexTicketCount) return is.s.DeindexTicket(ctx, id) } @@ -97,7 +77,6 @@ func (is *instrumentedService) DeindexTicket(ctx context.Context, id string) err func (is *instrumentedService) GetTickets(ctx context.Context, ids []string) ([]*pb.Ticket, error) { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetTickets") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreGetTicketsCount) return is.s.GetTickets(ctx, ids) } @@ -105,7 +84,6 @@ func (is *instrumentedService) GetTickets(ctx context.Context, ids []string) ([] func (is *instrumentedService) GetIndexedIDSet(ctx context.Context) (map[string]struct{}, error) { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetIndexedIDSet") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreGetIndexedIDSetCount) return is.s.GetIndexedIDSet(ctx) } @@ -113,7 +91,6 @@ func (is *instrumentedService) GetIndexedIDSet(ctx context.Context) (map[string] func (is *instrumentedService) UpdateAssignments(ctx context.Context, req *pb.AssignTicketsRequest) (*pb.AssignTicketsResponse, error) { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.UpdateAssignments") defer span.End() - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreUpdateAssignmentsCount) return is.s.UpdateAssignments(ctx, req) } @@ -121,17 +98,13 @@ func (is *instrumentedService) UpdateAssignments(ctx context.Context, req *pb.As func (is *instrumentedService) GetAssignments(ctx context.Context, id string, callback func(*pb.Assignment) error) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.GetAssignments") defer span.End() - return is.s.GetAssignments(ctx, id, func(a *pb.Assignment) error { - defer telemetry.RecordUnitMeasurement(ctx, mStateStoreGetAssignmentsCount) - return callback(a) - }) + return is.s.GetAssignments(ctx, id, callback) } // AddTicketsToIgnoreList appends new proposed tickets to the proposed sorted set with current timestamp func (is *instrumentedService) AddTicketsToIgnoreList(ctx context.Context, ids []string) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.AddTicketsToIgnoreList") defer span.End() - defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreAddTicketsToIgnoreListCount, int64(len(ids))) return is.s.AddTicketsToIgnoreList(ctx, ids) } @@ -139,6 +112,5 @@ func (is *instrumentedService) AddTicketsToIgnoreList(ctx context.Context, ids [ func (is *instrumentedService) DeleteTicketsFromIgnoreList(ctx context.Context, ids []string) error { ctx, span := trace.StartSpan(ctx, "statestore/instrumented.DeleteTicketsFromIgnoreList") defer span.End() - defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreDeleteTicketFromIgnoreListCount, int64(len(ids))) return is.s.DeleteTicketsFromIgnoreList(ctx, ids) } diff --git a/internal/statestore/redis.go b/internal/statestore/redis.go index d44fe6b3..daea970f 100644 --- a/internal/statestore/redis.go +++ b/internal/statestore/redis.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "open-match.dev/open-match/internal/config" - "open-match.dev/open-match/internal/telemetry" "open-match.dev/open-match/pkg/pb" ) @@ -38,9 +37,6 @@ var ( "app": "openmatch", "component": "statestore.redis", }) - mRedisConnLatencyMs = telemetry.HistogramWithBounds("redis/connectlatency", "latency to get a redis connection", "ms", telemetry.HistogramBounds) - mRedisConnPoolActive = telemetry.Gauge("redis/connectactivecount", "number of connections in the pool, includes idle plus connections in use") - mRedisConnPoolIdle = telemetry.Gauge("redis/connectidlecount", "number of idle connections in the pool") ) type redisBackend struct { @@ -177,16 +173,11 @@ func (rb *redisBackend) HealthCheck(ctx context.Context) error { } defer handleConnectionClose(&redisConn) - poolStats := rb.redisPool.Stats() - telemetry.SetGauge(ctx, mRedisConnPoolActive, int64(poolStats.ActiveCount)) - telemetry.SetGauge(ctx, mRedisConnPoolIdle, int64(poolStats.IdleCount)) - _, err = redisConn.Do("PING") // Encountered an issue getting a connection from the pool. if err != nil { return status.Errorf(codes.Unavailable, "%v", err) } - return nil } @@ -229,7 +220,6 @@ func redisURLFromAddr(addr string, cfg config.View, usePassword bool) string { } func (rb *redisBackend) connect(ctx context.Context) (redis.Conn, error) { - startTime := time.Now() redisConn, err := rb.redisPool.GetContext(ctx) if err != nil { redisLogger.WithFields(logrus.Fields{ @@ -237,8 +227,6 @@ func (rb *redisBackend) connect(ctx context.Context) (redis.Conn, error) { }).Error("failed to connect to redis") return nil, status.Errorf(codes.Unavailable, "%v", err) } - telemetry.RecordNUnitMeasurement(ctx, mRedisConnLatencyMs, time.Since(startTime).Milliseconds()) - return redisConn, nil } diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 94233a32..3a27348b 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -22,9 +22,11 @@ import ( "go.opencensus.io/tag" ) +// Default histogram distributions var ( - // HistogramBounds defines a unified bucket boundaries for all histogram typed time metrics in Open Match - HistogramBounds = []float64{0, 50, 100, 200, 400, 800, 1600, 3200, 6400, 12800, 25600, 51200} + DefaultBytesDistribution = view.Distribution(64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) + DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) + DefaultCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) ) // Gauge creates a gauge metric to be recorded with dimensionless unit. diff --git a/internal/telemetry/metrics_test.go b/internal/telemetry/metrics_test.go deleted file mode 100644 index 5f5cd948..00000000 --- a/internal/telemetry/metrics_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2019 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package telemetry - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.opencensus.io/stats" - utilTesting "open-match.dev/open-match/internal/util/testing" -) - -func TestRecordUnitMeasurement(t *testing.T) { - ctx := utilTesting.NewContext(t) - c := Counter("telemetry/fake_metric", "fake") - RecordUnitMeasurement(ctx, c) - RecordUnitMeasurement(ctx, c) -} - -func TestDoubleMetric(t *testing.T) { - assert := assert.New(t) - c := Counter("telemetry/fake_metric", "fake") - c2 := Counter("telemetry/fake_metric", "fake") - assert.Equal(c, c2) -} - -func TestDoubleRegisterView(t *testing.T) { - assert := assert.New(t) - mFakeCounter := stats.Int64("telemetry/fake_metric", "Fake", "1") - v := counterView(mFakeCounter) - v2 := counterView(mFakeCounter) - assert.Equal(v, v2) -} diff --git a/internal/telemetry/probe.go b/internal/telemetry/probe.go index 01700f12..dd37e948 100644 --- a/internal/telemetry/probe.go +++ b/internal/telemetry/probe.go @@ -19,8 +19,6 @@ import ( "fmt" "net/http" "sync/atomic" - - "go.opencensus.io/tag" ) const ( @@ -32,11 +30,6 @@ const ( healthStateUnhealthy = int32(2) ) -var ( - successKey = tag.MustNewKey("success") - mReadinessProbes = Counter("health/readiness", "readiness probes", successKey) -) - type statefulProbe struct { healthState *int32 probes []func(context.Context) error @@ -58,11 +51,10 @@ func (sp *statefulProbe) ServeHTTP(w http.ResponseWriter, req *http.Request) { logger.WithError(err).Warningf("%s health check failed. The server will terminate if this continues to happen.", HealthCheckEndpoint) } http.Error(w, err.Error(), http.StatusServiceUnavailable) - RecordUnitMeasurement(req.Context(), mReadinessProbes, tag.Insert(successKey, "false")) return } } - RecordUnitMeasurement(req.Context(), mReadinessProbes, tag.Insert(successKey, "true")) + old := atomic.SwapInt32(sp.healthState, healthStateHealthy) if old == healthStateUnhealthy { logger.Infof("%s is healthy again.", HealthCheckEndpoint) diff --git a/third_party/swaggerui/config.json b/third_party/swaggerui/config.json index a3a6db48..6c70b3cf 100644 --- a/third_party/swaggerui/config.json +++ b/third_party/swaggerui/config.json @@ -7,4 +7,4 @@ {"name": "Synchronizer", "url": "https://open-match.dev/api/v0.0.0-dev/synchronizer.swagger.json"}, {"name": "Evaluator", "url": "https://open-match.dev/api/v0.0.0-dev/evaluator.swagger.json"} ] -} \ No newline at end of file +}