Rename Ignore list to Pending Release ()

Fix naming across all code. Swagger changes left.

Co-authored-by: Scott Redig <sredig@google.com>
This commit is contained in:
Alexander Apalikov
2020-07-08 23:56:30 +03:00
committed by GitHub
parent c53a5b7c88
commit 679cfb5839
8 changed files with 42 additions and 43 deletions

@ -308,7 +308,7 @@ func callHTTPMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProf
func (s *backendService) ReleaseTickets(ctx context.Context, req *pb.ReleaseTicketsRequest) (*pb.ReleaseTicketsResponse, error) {
err := doReleasetickets(ctx, req, s.store)
if err != nil {
logger.WithError(err).Error("failed to remove the awaiting tickets from the ignore list for requested tickets")
logger.WithError(err).Error("failed to remove the awaiting tickets from the pending release for requested tickets")
return nil, err
}
@ -363,7 +363,7 @@ func doAssignTickets(ctx context.Context, req *pb.AssignTicketsRequest, store st
}
}
if err = store.DeleteTicketsFromIgnoreList(ctx, ids); err != nil {
if err = store.DeleteTicketsFromPendingRelease(ctx, ids); err != nil {
logger.WithFields(logrus.Fields{
"ticket_ids": ids,
}).Error(err)
@ -373,11 +373,11 @@ func doAssignTickets(ctx context.Context, req *pb.AssignTicketsRequest, store st
}
func doReleasetickets(ctx context.Context, req *pb.ReleaseTicketsRequest, store statestore.Service) error {
err := store.DeleteTicketsFromIgnoreList(ctx, req.GetTicketIds())
err := store.DeleteTicketsFromPendingRelease(ctx, req.GetTicketIds())
if err != nil {
logger.WithFields(logrus.Fields{
"ticket_ids": req.GetTicketIds(),
}).WithError(err).Error("failed to delete the tickets from the ignore list")
}).WithError(err).Error("failed to delete the tickets from the pending release list")
return err
}

@ -137,12 +137,12 @@ func doDeleteTicket(ctx context.Context, id string, store statestore.Service) er
"id": id,
}).Error("failed to delete the ticket")
}
err = store.DeleteTicketsFromIgnoreList(ctx, []string{id})
err = store.DeleteTicketsFromPendingRelease(ctx, []string{id})
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"id": id,
}).Error("failed to delete the ticket from ignorelist")
}).Error("failed to delete the ticket from pendingRelease")
}
// TODO: If other redis queues are implemented or we have custom index fields
// created by Open Match, those need to be cleaned up here.

@ -43,7 +43,7 @@ var (
// Streams from multiple GRPC calls of matches are combined on a single channel.
// These matches are sent to the evaluator, then the tickets are added to the
// ignore list. Finally the matches are returned to the calling stream.
// pending release list. Finally the matches are returned to the calling stream.
// receive from backend | Synchronize
// -> m1c ->
@ -55,7 +55,7 @@ var (
// -> m4c -> (buffered)
// send to evaluator | wrapEvaluator
// -> m5c -> (buffered)
// add tickets to ignore list | addMatchesToIgnoreList
// add tickets to pending release | addMatchesToPendingRelease
// -> m6c ->
// fan out to origin synchronize call | fanInFanOut
// -> (Synchronize call specific ) m7c -> (buffered)
@ -240,8 +240,8 @@ func (s *synchronizerService) runCycle() {
go s.cacheMatchIDToTicketIDs(matchTickets, m3c, m4c)
go s.wrapEvaluator(ctx, cancel, bufferMatchChannel(m4c), m5c)
go func() {
s.addMatchesToIgnoreList(ctx, matchTickets, cancel, bufferStringChannel(m5c), m6c)
// Wait for ignore list, but not all matches returned, the next cycle
s.addMatchesToPendingRelease(ctx, matchTickets, cancel, bufferStringChannel(m5c), m6c)
// Wait for pending release, but not all matches returned, the next cycle
// can start now.
close(closedOnCycleEnd)
}()
@ -435,10 +435,10 @@ func getTicketIds(tickets []*pb.Ticket) []string {
///////////////////////////////////////
// Calls statestore to add all of the tickets returned by the evaluator to the
// ignorelist. If it partially fails for whatever reason (not all tickets will
// pendingRelease list. If it partially fails for whatever reason (not all tickets will
// nessisarily be in the same call), only the matches which can be safely
// returned to the Synchronize calls are.
func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, m *sync.Map, cancel contextcause.CancelErrFunc, m5c <-chan []string, m6c chan<- string) {
func (s *synchronizerService) addMatchesToPendingRelease(ctx context.Context, m *sync.Map, cancel contextcause.CancelErrFunc, m5c <-chan []string, m6c chan<- string) {
totalMatches := 0
successfulMatches := 0
var lastErr error
@ -453,7 +453,7 @@ func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, m *syn
}
}
err := s.store.AddTicketsToIgnoreList(ctx, ids)
err := s.store.AddTicketsToPendingRelease(ctx, ids)
totalMatches += len(mIDs)
if err == nil {
@ -472,10 +472,10 @@ func (s *synchronizerService) addMatchesToIgnoreList(ctx context.Context, m *syn
"error": lastErr.Error(),
"totalMatches": totalMatches,
"successfulMatches": successfulMatches,
}).Error("some or all matches were not successfully added to the ignore list, failed matches dropped")
}).Error("some or all matches were not successfully added to the pending release, failed matches dropped")
if successfulMatches == 0 {
cancel(fmt.Errorf("no matches successfully added to the ignore list. Last error: %w", lastErr))
cancel(fmt.Errorf("no matches successfully added to the pending release. Last error: %w", lastErr))
}
}
close(m6c)

@ -89,16 +89,16 @@ func (is *instrumentedService) GetAssignments(ctx context.Context, id string, ca
return is.s.GetAssignments(ctx, id, callback)
}
func (is *instrumentedService) AddTicketsToIgnoreList(ctx context.Context, ids []string) error {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.AddTicketsToIgnoreList")
func (is *instrumentedService) AddTicketsToPendingRelease(ctx context.Context, ids []string) error {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.AddTicketsToPendingRelease")
defer span.End()
return is.s.AddTicketsToIgnoreList(ctx, ids)
return is.s.AddTicketsToPendingRelease(ctx, ids)
}
func (is *instrumentedService) DeleteTicketsFromIgnoreList(ctx context.Context, ids []string) error {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.DeleteTicketsFromIgnoreList")
func (is *instrumentedService) DeleteTicketsFromPendingRelease(ctx context.Context, ids []string) error {
ctx, span := trace.StartSpan(ctx, "statestore/instrumented.DeleteTicketsFromPendingRelease")
defer span.End()
return is.s.DeleteTicketsFromIgnoreList(ctx, ids)
return is.s.DeleteTicketsFromPendingRelease(ctx, ids)
}
func (is *instrumentedService) ReleaseAllTickets(ctx context.Context) error {

@ -55,11 +55,10 @@ type Service interface {
// GetAssignments returns the assignment associated with the input ticket id
GetAssignments(ctx context.Context, id string, callback func(*pb.Assignment) error) error
// AddProposedTickets appends new proposed tickets to the proposed sorted set with current timestamp
AddTicketsToIgnoreList(ctx context.Context, ids []string) error
AddTicketsToPendingRelease(ctx context.Context, ids []string) error
// DeleteTicketsFromIgnoreList deletes tickets from the proposed sorted set
DeleteTicketsFromIgnoreList(ctx context.Context, ids []string) error
// DeleteTicketsFromPendingRelease deletes tickets from the proposed sorted set
DeleteTicketsFromPendingRelease(ctx context.Context, ids []string) error
// ReleaseAllTickets releases all pending tickets back to active
ReleaseAllTickets(ctx context.Context) error

@ -391,10 +391,10 @@ func (rb *redisBackend) GetIndexedIDSet(ctx context.Context) (map[string]struct{
startTimeInt := curTime.Add(-ttl).UnixNano()
// Filter out tickets that are fetched but not assigned within ttl time (ms).
idsInIgnoreLists, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", "proposed_ticket_ids", startTimeInt, endTimeInt))
idsInPendingReleases, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", "proposed_ticket_ids", startTimeInt, endTimeInt))
if err != nil {
redisLogger.WithError(err).Error("failed to get proposed tickets")
return nil, status.Errorf(codes.Internal, "error getting ignore list %v", err)
return nil, status.Errorf(codes.Internal, "error getting pending release %v", err)
}
idsIndexed, err := redis.Strings(redisConn.Do("SMEMBERS", allTickets))
@ -409,7 +409,7 @@ func (rb *redisBackend) GetIndexedIDSet(ctx context.Context) (map[string]struct{
for _, id := range idsIndexed {
r[id] = struct{}{}
}
for _, id := range idsInIgnoreLists {
for _, id := range idsInPendingReleases {
delete(r, id)
}
@ -600,8 +600,8 @@ func (rb *redisBackend) GetAssignments(ctx context.Context, id string, callback
return nil
}
// AddProposedTickets appends new proposed tickets to the proposed sorted set with current timestamp
func (rb *redisBackend) AddTicketsToIgnoreList(ctx context.Context, ids []string) error {
// AddTicketsToPendingRelease appends new proposed tickets to the proposed sorted set with current timestamp
func (rb *redisBackend) AddTicketsToPendingRelease(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
@ -621,15 +621,15 @@ func (rb *redisBackend) AddTicketsToIgnoreList(ctx context.Context, ids []string
_, err = redisConn.Do("ZADD", cmds...)
if err != nil {
redisLogger.WithError(err).Error("failed to append proposed tickets to ignore list")
redisLogger.WithError(err).Error("failed to append proposed tickets to pending release")
return status.Error(codes.Internal, err.Error())
}
return nil
}
// DeleteTicketsFromIgnoreList deletes tickets from the proposed sorted set
func (rb *redisBackend) DeleteTicketsFromIgnoreList(ctx context.Context, ids []string) error {
// DeleteTicketsFromPendingRelease deletes tickets from the proposed sorted set
func (rb *redisBackend) DeleteTicketsFromPendingRelease(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
@ -648,7 +648,7 @@ func (rb *redisBackend) DeleteTicketsFromIgnoreList(ctx context.Context, ids []s
_, err = redisConn.Do("ZREM", cmds...)
if err != nil {
redisLogger.WithError(err).Error("failed to delete proposed tickets from ignore list")
redisLogger.WithError(err).Error("failed to delete proposed tickets from pending release")
return status.Error(codes.Internal, err.Error())
}

@ -104,7 +104,7 @@ func TestTicketLifecycle(t *testing.T) {
assert.NotNil(err)
}
func TestIgnoreLists(t *testing.T) {
func TestPendingReleases(t *testing.T) {
// Create State Store
assert := assert.New(t)
cfg, closer := createRedis(t, true, "")
@ -135,16 +135,16 @@ func TestIgnoreLists(t *testing.T) {
// Verify all tickets are created and returned
verifyTickets(service, len(tickets))
// Add the first three tickets to the ignore list and verify changes are reflected in the result
assert.Nil(service.AddTicketsToIgnoreList(ctx, ticketIds[:3]))
// Add the first three tickets to the pending release and verify changes are reflected in the result
assert.Nil(service.AddTicketsToPendingRelease(ctx, ticketIds[:3]))
verifyTickets(service, len(tickets)-3)
// Sleep until the ignore list expired and verify we still have all the tickets
// Sleep until the pending release expired and verify we still have all the tickets
time.Sleep(cfg.GetDuration("pendingReleaseTimeout"))
verifyTickets(service, len(tickets))
}
func TestDeleteTicketsFromIgnoreList(t *testing.T) {
func TestDeleteTicketsFromPendingRelease(t *testing.T) {
// Create State Store
assert := assert.New(t)
cfg, closer := createRedis(t, true, "")
@ -175,11 +175,11 @@ func TestDeleteTicketsFromIgnoreList(t *testing.T) {
// Verify all tickets are created and returned
verifyTickets(service, len(tickets))
// Add the first three tickets to the ignore list and verify changes are reflected in the result
assert.Nil(service.AddTicketsToIgnoreList(ctx, ticketIds[:3]))
// Add the first three tickets to the pending release and verify changes are reflected in the result
assert.Nil(service.AddTicketsToPendingRelease(ctx, ticketIds[:3]))
verifyTickets(service, len(tickets)-3)
assert.Nil(service.DeleteTicketsFromIgnoreList(ctx, ticketIds[:3]))
assert.Nil(service.DeleteTicketsFromPendingRelease(ctx, ticketIds[:3]))
verifyTickets(service, len(tickets))
}

@ -25,7 +25,7 @@ const (
PoolIdleTimeout = 10 * time.Second
// PoolHealthCheckTimeout is the read/write timeout of a healthcheck HTTP request
PoolHealthCheckTimeout = 100 * time.Millisecond
// pendingReleaseTimeout is the time to live duration of Open Match ignore list settings
// pendingReleaseTimeout is the time to live duration of Open Match pending release settings
pendingReleaseTimeout = 500 * time.Millisecond
// InitialInterval is the initial backoff time of a backoff strategy
InitialInterval = 30 * time.Millisecond