mirror of
https://github.com/googleforgames/open-match.git
synced 2025-03-25 13:24:18 +00:00
added configurable backoff to MatchObject and Player watchers
This commit is contained in:
committed by
Joseph Holley
parent
a11556433b
commit
393e1d6de2
@ -27,11 +27,13 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/expbo"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/metrics"
|
||||
backend "github.com/GoogleCloudPlatform/open-match/internal/pb"
|
||||
redisHelpers "github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis/ignorelist"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis/redispb"
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/gogo/protobuf/jsonpb"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -209,43 +211,41 @@ func (s *backendAPI) CreateMatch(c context.Context, profile *backend.MatchObject
|
||||
}
|
||||
beLog.Info("Profile added to processing queue")
|
||||
|
||||
watcherBO := backoff.NewExponentialBackOff()
|
||||
if err := expbo.UnmarshalExponentialBackOff(s.cfg.GetString("api.backend.backoff"), watcherBO); err != nil {
|
||||
beLog.WithError(err).Warn("Could not parse backoff string, using default backoff parameters for MatchObject watcher")
|
||||
}
|
||||
|
||||
watcherBOCtx := backoff.WithContext(watcherBO, ctx)
|
||||
|
||||
// get and return matchobject, it will be written to the requestKey when the MMF has finished.
|
||||
var ok bool
|
||||
newMO := backend.MatchObject{Id: requestKey}
|
||||
watchChan := redispb.Watcher(ctx, s.pool, newMO) // Watcher() runs the appropriate Redis commands.
|
||||
errString := ("Error retrieving matchmaking results from state storage")
|
||||
timeout := time.Duration(s.cfg.GetInt("api.backend.timeout")) * time.Second
|
||||
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
// TODO:Timeout: deal with the fallout. There are some edge cases here.
|
||||
// When there is a timeout, need to send a stop to the watch channel.
|
||||
watchChan := redispb.Watcher(watcherBOCtx, s.pool, backend.MatchObject{Id: requestKey}) // Watcher() runs the appropriate Redis commands.
|
||||
newMO, ok := <-watchChan
|
||||
if !ok {
|
||||
// ok is false if watchChan has been closed by redispb.Watcher()
|
||||
// This happens when Watcher stops because of context cancellation or backing off reached time limit
|
||||
stats.Record(fnCtx, BeGrpcRequests.M(1))
|
||||
return profile, errors.New(errString + ": timeout exceeded")
|
||||
|
||||
case newMO, ok = <-watchChan:
|
||||
if !ok {
|
||||
// ok is false if watchChan has been closed by redispb.Watcher()
|
||||
newMO.Error = newMO.Error + "; channel closed - was the context cancelled?"
|
||||
if watcherBOCtx.Context().Err() != nil {
|
||||
newMO.Error = "channel closed: " + watcherBOCtx.Context().Err().Error()
|
||||
} else {
|
||||
// 'ok' was true, so properties should contain the results from redis.
|
||||
// Do basic error checking on the returned JSON
|
||||
if !gjson.Valid(profile.Properties) {
|
||||
newMO.Error = "retreived properties json was malformed"
|
||||
}
|
||||
newMO.Error = "channel closed: backoff deadline exceeded"
|
||||
}
|
||||
return &newMO, errors.New("Error retrieving matchmaking results from state storage: " + newMO.Error)
|
||||
}
|
||||
|
||||
// TODO test that this is the correct condition for an empty error.
|
||||
if newMO.Error != "" {
|
||||
stats.Record(fnCtx, BeGrpcErrors.M(1))
|
||||
return &newMO, errors.New(newMO.Error)
|
||||
}
|
||||
// 'ok' was true, so properties should contain the results from redis.
|
||||
// Do basic error checking on the returned JSON
|
||||
if !gjson.Valid(profile.Properties) {
|
||||
newMO.Error = "retreived properties json was malformed"
|
||||
}
|
||||
|
||||
// Got results; close the channel so the Watcher() function stops querying redis.
|
||||
// TODO test that this is the correct condition for an empty error.
|
||||
if newMO.Error != "" {
|
||||
stats.Record(fnCtx, BeGrpcErrors.M(1))
|
||||
return &newMO, errors.New(newMO.Error)
|
||||
}
|
||||
|
||||
beLog.Info("Matchmaking results received, returning to backend client")
|
||||
|
||||
stats.Record(fnCtx, BeGrpcRequests.M(1))
|
||||
return &newMO, err
|
||||
}
|
||||
|
@ -23,13 +23,15 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/expbo"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/metrics"
|
||||
frontend "github.com/GoogleCloudPlatform/open-match/internal/pb"
|
||||
redisHelpers "github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis/playerindices"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis/redispb"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/tag"
|
||||
@ -190,39 +192,47 @@ func (s *frontendAPI) GetUpdates(p *frontend.Player, assignmentStream frontend.F
|
||||
funcName := "GetAssignment"
|
||||
fnCtx, _ := tag.New(ctx, tag.Insert(KeyMethod, funcName))
|
||||
|
||||
watcherBO := backoff.NewExponentialBackOff()
|
||||
if err := expbo.UnmarshalExponentialBackOff(s.cfg.GetString("api.frontend.backoff"), watcherBO); err != nil {
|
||||
feLog.WithError(err).Warn("Could not parse backoff string, using default backoff parameters for Player watcher")
|
||||
}
|
||||
|
||||
// We have to stop Watcher manually because in a normal case client closes channel before the timeout
|
||||
watcherCtx, stopWatcher := context.WithCancel(context.Background())
|
||||
defer stopWatcher()
|
||||
watcherBOCtx := backoff.WithContext(watcherBO, watcherCtx)
|
||||
|
||||
// get and return connection string
|
||||
watchChan := redispb.PlayerWatcher(ctx, s.pool, *p) // watcher() runs the appropriate Redis commands.
|
||||
timeoutChan := time.After(time.Duration(s.cfg.GetInt("api.frontend.timeout")) * time.Second)
|
||||
watchChan := redispb.PlayerWatcher(watcherBOCtx, s.pool, *p) // watcher() runs the appropriate Redis commands.
|
||||
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context cancelled
|
||||
feLog.WithFields(log.Fields{
|
||||
"playerid": p.Id,
|
||||
}).Info("client closed connection successfully")
|
||||
feLog.WithField("playerid", p.Id).Info("client closed connection successfully")
|
||||
stats.Record(fnCtx, FeGrpcRequests.M(1))
|
||||
return nil
|
||||
case <-timeoutChan: // Timeout reached without client closing connection
|
||||
// TODO:deal with the fallout
|
||||
err := errors.New("server timeout reached without client closing connection")
|
||||
feLog.WithFields(log.Fields{
|
||||
"error": err.Error(),
|
||||
"component": "statestorage",
|
||||
"playerid": p.Id,
|
||||
}).Error("State storage error")
|
||||
|
||||
// Count errors for metrics
|
||||
errTag, _ := tag.NewKey("errtype")
|
||||
fnCtx, _ := tag.New(ctx, tag.Insert(errTag, "watch_timeout"))
|
||||
stats.Record(fnCtx, FeGrpcErrors.M(1))
|
||||
//TODO: we could generate a frontend.player message with an error
|
||||
//field and stream it to the client before throwing the error here
|
||||
//if we wanted to send more useful client retry information
|
||||
return err
|
||||
case a, ok := <-watchChan:
|
||||
if !ok {
|
||||
// Timeout reached without client closing connection
|
||||
err := errors.New("server timeout reached without client closing connection")
|
||||
feLog.WithFields(log.Fields{
|
||||
"error": err.Error(),
|
||||
"component": "statestorage",
|
||||
"playerid": p.Id,
|
||||
}).Error("State storage error")
|
||||
|
||||
// Count errors for metrics
|
||||
errTag, _ := tag.NewKey("errtype")
|
||||
fnCtx, _ := tag.New(ctx, tag.Insert(errTag, "watch_timeout"))
|
||||
stats.Record(fnCtx, FeGrpcErrors.M(1))
|
||||
//TODO: we could generate a frontend.player message with an error
|
||||
//field and stream it to the client before throwing the error here
|
||||
//if we wanted to send more useful client retry information
|
||||
return err
|
||||
}
|
||||
|
||||
case a := <-watchChan:
|
||||
feLog.WithFields(log.Fields{
|
||||
"assignment": a.Assignment,
|
||||
"playerid": a.Id,
|
||||
@ -231,8 +241,6 @@ func (s *frontendAPI) GetUpdates(p *frontend.Player, assignmentStream frontend.F
|
||||
}).Info("updating client")
|
||||
assignmentStream.Send(&a)
|
||||
stats.Record(fnCtx, FeGrpcStreamedResponses.M(1))
|
||||
// Reset timeout.
|
||||
timeoutChan = time.After(time.Duration(s.cfg.GetInt("api.frontend.timeout")) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,11 +9,11 @@ api:
|
||||
backend:
|
||||
hostname: om-backendapi
|
||||
port: 50505
|
||||
timeout: 30
|
||||
backoff: "[2 32] *2 ~0.33 <30"
|
||||
frontend:
|
||||
hostname: om-frontendapi
|
||||
port: 50504
|
||||
timeout: 300
|
||||
backoff: "[2 32] *2 ~0.33 <300"
|
||||
mmlogic:
|
||||
hostname: om-mmlogicapi
|
||||
port: 50503
|
||||
|
63
internal/expbo/unmarshal.go
Normal file
63
internal/expbo/unmarshal.go
Normal file
@ -0,0 +1,63 @@
|
||||
package expbo
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
)
|
||||
|
||||
// UnmarshalExponentialBackOff populates ExponentialBackOff structure parsing strings of format:
|
||||
// "[InitInterval MaxInterval] *Multiplier ~RandomizationFactor <MaxElapsedTime"
|
||||
//
|
||||
// Example: "[0.250 30] *1.5 ~0.33 <7200"
|
||||
func UnmarshalExponentialBackOff(s string, b *backoff.ExponentialBackOff) error {
|
||||
var (
|
||||
min, max, mult, rand, limit float64
|
||||
err error
|
||||
)
|
||||
|
||||
for _, word := range strings.Split(strings.TrimSpace(s), " ") {
|
||||
switch {
|
||||
case word == "":
|
||||
continue
|
||||
case strings.HasPrefix(word, "["):
|
||||
min, err = strconv.ParseFloat(strings.TrimPrefix(word, "["), 64)
|
||||
if err != nil {
|
||||
return errors.New("cannot parse InitInterval value: " + err.Error())
|
||||
}
|
||||
case strings.HasSuffix(word, "]"):
|
||||
max, err = strconv.ParseFloat(strings.TrimSuffix(word, "]"), 64)
|
||||
if err != nil {
|
||||
return errors.New("cannot parse MaxInterval value: " + err.Error())
|
||||
}
|
||||
case strings.HasPrefix(word, "*"):
|
||||
mult, err = strconv.ParseFloat(strings.TrimPrefix(word, "*"), 64)
|
||||
if err != nil {
|
||||
return errors.New("cannot parse Multiplier value: " + err.Error())
|
||||
}
|
||||
case strings.HasPrefix(word, "~"):
|
||||
rand, err = strconv.ParseFloat(strings.TrimPrefix(word, "~"), 64)
|
||||
if err != nil {
|
||||
return errors.New("cannot parse RandomizationFactor value: " + err.Error())
|
||||
}
|
||||
case strings.HasPrefix(word, "<"):
|
||||
limit, err = strconv.ParseFloat(strings.TrimPrefix(word, "<"), 64)
|
||||
if err != nil {
|
||||
return errors.New("cannot parse MaxElapsedTime value: " + err.Error())
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf(`unexpected word "%s"`, word)
|
||||
}
|
||||
}
|
||||
|
||||
b.InitialInterval = time.Duration(min * float64(time.Second))
|
||||
b.MaxInterval = time.Duration(max * float64(time.Second))
|
||||
b.Multiplier = mult
|
||||
b.RandomizationFactor = rand
|
||||
b.MaxElapsedTime = time.Duration(limit * float64(time.Second))
|
||||
return nil
|
||||
}
|
35
internal/expbo/unmarshal_test.go
Normal file
35
internal/expbo/unmarshal_test.go
Normal file
@ -0,0 +1,35 @@
|
||||
package expbo
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
)
|
||||
|
||||
func TestUnmarshalExponentialBackOff(t *testing.T) {
|
||||
s := "[0.25 30] *1.5 ~0.33 <300"
|
||||
|
||||
b := backoff.NewExponentialBackOff()
|
||||
err := UnmarshalExponentialBackOff(s, b)
|
||||
if err != nil {
|
||||
t.Fatalf(`error umarshaling "%s": %+v`, s, err)
|
||||
}
|
||||
|
||||
if b.InitialInterval != 250*time.Millisecond {
|
||||
t.Error("unexpected InitialInterval value:", b.InitialInterval)
|
||||
}
|
||||
if b.MaxInterval != 30*time.Second {
|
||||
t.Error("unexpected MaxInterval value:", b.MaxInterval)
|
||||
}
|
||||
if math.Abs(b.Multiplier-1.5) > 1e-8 {
|
||||
t.Error("unexpected Multiplier value:", b.Multiplier)
|
||||
}
|
||||
if math.Abs(b.RandomizationFactor-0.33) > 1e-8 {
|
||||
t.Error("unexpected RandomizationFactor value:", b.RandomizationFactor)
|
||||
}
|
||||
if b.MaxElapsedTime != 5*time.Minute {
|
||||
t.Error("unexpected MaxElapsedTime value:", b.MaxElapsedTime)
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@ import (
|
||||
"time"
|
||||
|
||||
om_messages "github.com/GoogleCloudPlatform/open-match/internal/pb"
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/gogo/protobuf/jsonpb"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -108,35 +109,38 @@ func UnmarshalFromRedis(ctx context.Context, pool *redis.Pool, pb *om_messages.M
|
||||
// The pattern for this function is from 'Go Concurrency Patterns', it is a function
|
||||
// that wraps a closure goroutine, and returns a channel.
|
||||
// reference: https://talks.golang.org/2012/concurrency.slide#25
|
||||
func Watcher(ctx context.Context, pool *redis.Pool, pb om_messages.MatchObject) <-chan om_messages.MatchObject {
|
||||
//
|
||||
// NOTE: runs until cancelled, timed out or result is found in Redis.
|
||||
func Watcher(bo backoff.BackOffContext, pool *redis.Pool, pb om_messages.MatchObject) <-chan om_messages.MatchObject {
|
||||
|
||||
watchChan := make(chan om_messages.MatchObject)
|
||||
results := om_messages.MatchObject{Id: pb.Id}
|
||||
|
||||
go func() {
|
||||
defer close(watchChan)
|
||||
|
||||
// var declaration
|
||||
var err = errors.New("haven't queried Redis yet")
|
||||
|
||||
// Loop, querying redis until this key has a value
|
||||
for err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Cleanup
|
||||
close(watchChan)
|
||||
for {
|
||||
results = om_messages.MatchObject{Id: pb.Id}
|
||||
err = UnmarshalFromRedis(bo.Context(), pool, &results)
|
||||
if err == nil {
|
||||
// Return value retreived from Redis asynchonously and tell calling function we're done
|
||||
moLog.Debug("state storage watched record update detected")
|
||||
watchChan <- results
|
||||
return
|
||||
}
|
||||
|
||||
if d := bo.NextBackOff(); d != backoff.Stop {
|
||||
moLog.Debug("No new results, backing off")
|
||||
time.Sleep(d)
|
||||
} else {
|
||||
moLog.Debug("No new results after all backoff attempts")
|
||||
return
|
||||
default:
|
||||
//results, err = Retrieve(ctx, pool, key)
|
||||
results = om_messages.MatchObject{Id: pb.Id}
|
||||
err = UnmarshalFromRedis(ctx, pool, &results)
|
||||
if err != nil {
|
||||
moLog.Debug("No new results")
|
||||
time.Sleep(2 * time.Second) // TODO: exp bo + jitter
|
||||
}
|
||||
}
|
||||
}
|
||||
// Return value retreived from Redis asynchonously and tell calling function we're done
|
||||
moLog.Debug("state storage watched record update detected")
|
||||
watchChan <- results
|
||||
}()
|
||||
|
||||
return watchChan
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
om_messages "github.com/GoogleCloudPlatform/open-match/internal/pb"
|
||||
"github.com/GoogleCloudPlatform/open-match/internal/statestorage/redis/playerindices"
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/gogo/protobuf/jsonpb"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -104,73 +105,72 @@ func UnmarshalPlayerFromRedis(ctx context.Context, pool *redis.Pool, player *om_
|
||||
// NOTE: this function will never stop querying Redis during normal operation! You need to
|
||||
// disconnect the client from the frontend API (which closes the context) once
|
||||
// you've received the results you were waiting for to stop doing work!
|
||||
func PlayerWatcher(ctx context.Context, pool *redis.Pool, pb om_messages.Player) <-chan om_messages.Player {
|
||||
func PlayerWatcher(bo backoff.BackOffContext, pool *redis.Pool, pb om_messages.Player) <-chan om_messages.Player {
|
||||
|
||||
pwLog := pLog.WithFields(log.Fields{"playerId": pb.Id})
|
||||
|
||||
// Establish channel to return results on.
|
||||
watchChan := make(chan om_messages.Player)
|
||||
watchChan := make(chan om_messages.Player, 1)
|
||||
|
||||
go func() {
|
||||
defer close(watchChan)
|
||||
|
||||
// var declaration
|
||||
var prevResults = ""
|
||||
|
||||
// Loop, querying redis until this key has a value or the Redis query fails.
|
||||
// Loop, querying redis until this key has a value or the Redis query fails
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Player stopped asking for updates; clean up
|
||||
close(watchChan)
|
||||
// Update the player's 'accessed' timestamp to denote they haven't disappeared
|
||||
err := playerindices.Touch(bo.Context(), pool, pb.Id)
|
||||
if err != nil {
|
||||
// Not fatal, but this error should be addressed. This could
|
||||
// cause the player to expire while still actively connected!
|
||||
pwLog.WithFields(log.Fields{"error": err.Error()}).Error("Unable to update accessed metadata timestamp")
|
||||
}
|
||||
|
||||
// Get player from redis.
|
||||
results := om_messages.Player{Id: pb.Id}
|
||||
err = UnmarshalPlayerFromRedis(bo.Context(), pool, &results)
|
||||
if err != nil {
|
||||
// Return error and quit.
|
||||
pwLog.Debug("State storage error:", err.Error())
|
||||
results.Error = err.Error()
|
||||
watchChan <- results
|
||||
return
|
||||
default:
|
||||
// Update the player's 'accessed' timestamp to denote they haven't disappeared
|
||||
err := playerindices.Touch(ctx, pool, pb.Id)
|
||||
if err != nil {
|
||||
// Not fatal, but this error should be addressed. This could
|
||||
// cause the player to expire while still actively connected!
|
||||
pwLog.WithFields(log.Fields{"error": err.Error()}).Error("Unable to update accessed metadata timestamp")
|
||||
}
|
||||
}
|
||||
|
||||
// Get player from redis.
|
||||
results := om_messages.Player{Id: pb.Id}
|
||||
err = UnmarshalPlayerFromRedis(ctx, pool, &results)
|
||||
if err != nil {
|
||||
// Return error and quit.
|
||||
pwLog.Debug("State storage error:", err.Error())
|
||||
results.Error = err.Error()
|
||||
watchChan <- results
|
||||
close(watchChan)
|
||||
return
|
||||
// Check for new results and send them. Store a copy of the
|
||||
// latest version in string form so we can compare it easily in
|
||||
// future loops.
|
||||
//
|
||||
// If we decide to watch other message fields for updates,
|
||||
// they will need to be added here.
|
||||
//
|
||||
// This can be made much cleaner if protobuffer reflection improves.
|
||||
curResults := fmt.Sprintf("%v%v%v", results.Assignment, results.Status, results.Error)
|
||||
if prevResults == curResults {
|
||||
pwLog.Debug("No new results, backing off")
|
||||
} else {
|
||||
// Return value retreived from Redis
|
||||
pwLog.Debug("state storage watched player record changed")
|
||||
watchedFields := om_messages.Player{
|
||||
// Return only the watched fields to minimize traffic
|
||||
Id: results.Id,
|
||||
Assignment: results.Assignment,
|
||||
Status: results.Status,
|
||||
Error: results.Error,
|
||||
}
|
||||
watchChan <- watchedFields
|
||||
prevResults = curResults
|
||||
|
||||
// Check for new results and send them. Store a copy of the
|
||||
// latest version in string form so we can compare it easily in
|
||||
// future loops.
|
||||
//
|
||||
// If we decide to watch other message fields for updates,
|
||||
// they will need to be added here.
|
||||
//
|
||||
// This can be made much cleaner if protobuffer reflection improves.
|
||||
curResults := fmt.Sprintf("%v%v%v", results.Assignment, results.Status, results.Error)
|
||||
if prevResults == curResults {
|
||||
pwLog.Debug("No new watcher results")
|
||||
// TODO: change the debug message once exp bo + jitter is implemented
|
||||
//pwLog.Debug("No new results, backing off")
|
||||
time.Sleep(2 * time.Second) // TODO: exp bo + jitter
|
||||
} else {
|
||||
// Return value retreived from Redis
|
||||
pwLog.Debug("state storage watched player record changed")
|
||||
watchedFields := om_messages.Player{
|
||||
// Return only the watched fields to minimize traffic
|
||||
Id: results.Id,
|
||||
Assignment: results.Assignment,
|
||||
Status: results.Status,
|
||||
Error: results.Error,
|
||||
}
|
||||
watchChan <- watchedFields
|
||||
prevResults = curResults
|
||||
time.Sleep(2 * time.Second) // TODO: reset exp bo + jitter
|
||||
}
|
||||
// Reset backoff & timeout counters
|
||||
bo.Reset()
|
||||
}
|
||||
|
||||
if d := bo.NextBackOff(); d != backoff.Stop {
|
||||
time.Sleep(d)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
Reference in New Issue
Block a user