|
|
|
@ -16,22 +16,35 @@ package statestore
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"open-match.dev/open-match/internal/telemetry"
|
|
|
|
|
"open-match.dev/open-match/pkg/pb"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
mStateStoreCreateTicket = telemetry.Counter("statestore/createticket", "tickets created")
|
|
|
|
|
mStateStoreGetTicket = telemetry.Counter("statestore/getticket", "tickets retrieve")
|
|
|
|
|
mStateStoreDeleteTicket = telemetry.Counter("statestore/deleteticket", "tickets deleted")
|
|
|
|
|
mStateStoreIndexTicket = telemetry.Counter("statestore/indexticket", "tickets indexed")
|
|
|
|
|
mStateStoreDeindexTicket = telemetry.Counter("statestore/deindexticket", "tickets deindexed")
|
|
|
|
|
mStateStoreFilterTickets = telemetry.Counter("statestore/filterticket", "tickets that were filtered and returned")
|
|
|
|
|
mStateStoreUpdateAssignments = telemetry.Counter("statestore/updateassignment", "tickets assigned")
|
|
|
|
|
mStateStoreGetAssignments = telemetry.Counter("statestore/getassignments", "ticket assigned retrieved")
|
|
|
|
|
mStateStoreAddTicketsToIgnoreList = telemetry.Counter("statestore/addticketstoignorelist", "tickets moved to ignore list")
|
|
|
|
|
mStateStoreDeleteTicketFromIgnoreList = telemetry.Counter("statestore/deleteticketfromignorelist", "tickets removed from ignore list")
|
|
|
|
|
mStateStoreCreateTicketCount = telemetry.Counter("statestore/createticketcount", "number of tickets created")
|
|
|
|
|
mStateStoreGetTicketCount = telemetry.Counter("statestore/getticketcount", "number of tickets retrieved")
|
|
|
|
|
mStateStoreDeleteTicketCount = telemetry.Counter("statestore/deleteticketcount", "number of tickets deleted")
|
|
|
|
|
mStateStoreIndexTicketCount = telemetry.Counter("statestore/indexticketcount", "number of tickets indexed")
|
|
|
|
|
mStateStoreDeindexTicketCount = telemetry.Counter("statestore/deindexticketcount", "number of tickets deindexed")
|
|
|
|
|
mStateStoreFilterTicketsCount = telemetry.Counter("statestore/filterticketcount", "number of tickets that were filtered and returned")
|
|
|
|
|
mStateStoreUpdateAssignmentsCount = telemetry.Counter("statestore/updateassignmentcount", "number of tickets assigned")
|
|
|
|
|
mStateStoreGetAssignmentsCount = telemetry.Counter("statestore/getassignmentscount", "number of ticket assigned retrieved")
|
|
|
|
|
mStateStoreAddTicketsToIgnoreListCount = telemetry.Counter("statestore/addticketstoignorelistcount", "number of tickets moved to ignore list")
|
|
|
|
|
mStateStoreDeleteTicketFromIgnoreListCount = telemetry.Counter("statestore/deleteticketfromignorelistcount", "number of tickets removed from ignore list")
|
|
|
|
|
|
|
|
|
|
histogramBounds = []float64{0, 50, 100, 200, 400, 800, 1600, 3200, 6400}
|
|
|
|
|
mStateStoreCreateTicketTime = telemetry.HistogramWithBounds("statestore/createtickettime", "time elapsed to create tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreGetTicketTime = telemetry.HistogramWithBounds("statestore/gettickettime", "time elapsed to retrieve tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreDeleteTicketTime = telemetry.HistogramWithBounds("statestore/deletetickettime", "time elapsed to delete tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreIndexTicketTime = telemetry.HistogramWithBounds("statestore/indextickettime", "time elapsed to index tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreDeindexTicketTime = telemetry.HistogramWithBounds("statestore/deindextickettime", "time elapsed to deindex tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreFilterTicketsTime = telemetry.HistogramWithBounds("statestore/filtertickettime", "time elapsed to filter tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreUpdateAssignmentsTime = telemetry.HistogramWithBounds("statestore/updateassignmenttime", "time elapsed to assign tickets", "ms", histogramBounds)
|
|
|
|
|
mStateStoreGetAssignmentsTime = telemetry.HistogramWithBounds("statestore/getassignmentstime", "time elapsed to retrieve assignments", "ms", histogramBounds)
|
|
|
|
|
mStateStoreAddTicketsToIgnoreListTime = telemetry.HistogramWithBounds("statestore/addticketstoignorelisttime", "time elapsed to move tickets to ignore list", "ms", histogramBounds)
|
|
|
|
|
mStateStoreDeleteTicketFromIgnoreListTime = telemetry.HistogramWithBounds("statestore/deleteticketfromignorelisttime", "time elapsed to remove tickets from ignore list", "ms", histogramBounds)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// instrumentedService is a wrapper for a statestore service that provides instrumentation (metrics and tracing) of the database.
|
|
|
|
@ -52,31 +65,41 @@ func (is *instrumentedService) HealthCheck(ctx context.Context) error {
|
|
|
|
|
|
|
|
|
|
// CreateTicket creates a new Ticket in the state storage. If the id already exists, it will be overwritten.
|
|
|
|
|
func (is *instrumentedService) CreateTicket(ctx context.Context, ticket *pb.Ticket) error {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreCreateTicket)
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreCreateTicketCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreCreateTicketTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.CreateTicket(ctx, ticket)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetTicket gets the Ticket with the specified id from state storage. This method fails if the Ticket does not exist.
|
|
|
|
|
func (is *instrumentedService) GetTicket(ctx context.Context, id string) (*pb.Ticket, error) {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreGetTicket)
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreGetTicketCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreGetTicketTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.GetTicket(ctx, id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DeleteTicket removes the Ticket with the specified id from state storage.
|
|
|
|
|
func (is *instrumentedService) DeleteTicket(ctx context.Context, id string) error {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreDeleteTicket)
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreDeleteTicketCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreDeleteTicketTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.DeleteTicket(ctx, id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// IndexTicket indexes the Ticket id for the configured index fields.
|
|
|
|
|
func (is *instrumentedService) IndexTicket(ctx context.Context, ticket *pb.Ticket) error {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreIndexTicket)
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreIndexTicketCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreIndexTicketTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.IndexTicket(ctx, ticket)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DeindexTicket removes the indexing for the specified Ticket. Only the indexes are removed but the Ticket continues to exist.
|
|
|
|
|
func (is *instrumentedService) DeindexTicket(ctx context.Context, id string) error {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreDeindexTicket)
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreDeindexTicketCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreDeindexTicketTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.DeindexTicket(ctx, id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -87,8 +110,10 @@ func (is *instrumentedService) DeindexTicket(ctx context.Context, id string) err
|
|
|
|
|
// "testplayer2": {"ranking" : 50, "loyalty_level": 3},
|
|
|
|
|
// }
|
|
|
|
|
func (is *instrumentedService) FilterTickets(ctx context.Context, pool *pb.Pool, pageSize int, callback func([]*pb.Ticket) error) error {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
return is.s.FilterTickets(ctx, pool, pageSize, func(t []*pb.Ticket) error {
|
|
|
|
|
telemetry.RecordNUnitMeasurement(ctx, mStateStoreFilterTickets, int64(len(t)))
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreFilterTicketsCount, int64(len(t)))
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreFilterTicketsTime, time.Since(start).Milliseconds())
|
|
|
|
|
return callback(t)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -98,26 +123,34 @@ func (is *instrumentedService) FilterTickets(ctx context.Context, pool *pb.Pool,
|
|
|
|
|
// However, since Redis does not support transaction roll backs (see https://redis.io/topics/transactions), some of the
|
|
|
|
|
// assignment fields might be partially updated if this function encounters an error halfway through the execution.
|
|
|
|
|
func (is *instrumentedService) UpdateAssignments(ctx context.Context, ids []string, assignment *pb.Assignment) error {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreUpdateAssignments)
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreUpdateAssignmentsCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreUpdateAssignmentsTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.UpdateAssignments(ctx, ids, assignment)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetAssignments returns the assignment associated with the input ticket id
|
|
|
|
|
func (is *instrumentedService) GetAssignments(ctx context.Context, id string, callback func(*pb.Assignment) error) error {
|
|
|
|
|
start := time.Now()
|
|
|
|
|
return is.s.GetAssignments(ctx, id, func(a *pb.Assignment) error {
|
|
|
|
|
telemetry.RecordUnitMeasurement(ctx, mStateStoreGetAssignments)
|
|
|
|
|
defer telemetry.RecordUnitMeasurement(ctx, mStateStoreGetAssignmentsCount)
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreGetAssignmentsTime, time.Since(start).Milliseconds())
|
|
|
|
|
return callback(a)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// AddTicketsToIgnoreList appends new proposed tickets to the proposed sorted set with current timestamp
|
|
|
|
|
func (is *instrumentedService) AddTicketsToIgnoreList(ctx context.Context, ids []string) error {
|
|
|
|
|
telemetry.RecordNUnitMeasurement(ctx, mStateStoreAddTicketsToIgnoreList, int64(len(ids)))
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreAddTicketsToIgnoreListCount, int64(len(ids)))
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreAddTicketsToIgnoreListTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.AddTicketsToIgnoreList(ctx, ids)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DeleteTicketsFromIgnoreList deletes tickets from the proposed sorted set
|
|
|
|
|
func (is *instrumentedService) DeleteTicketsFromIgnoreList(ctx context.Context, ids []string) error {
|
|
|
|
|
telemetry.RecordNUnitMeasurement(ctx, mStateStoreDeleteTicketFromIgnoreList, int64(len(ids)))
|
|
|
|
|
start := time.Now()
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreDeleteTicketFromIgnoreListCount, int64(len(ids)))
|
|
|
|
|
defer telemetry.RecordNUnitMeasurement(ctx, mStateStoreDeleteTicketFromIgnoreListTime, time.Since(start).Milliseconds())
|
|
|
|
|
return is.s.DeleteTicketsFromIgnoreList(ctx, ids)
|
|
|
|
|
}
|
|
|
|
|