Have redis filter query return full tickets (#429)

* Have redis filter query and return full tickets

* break out paging logic from redis filter

* Per code review, return grpc statuses
This commit is contained in:
Scott Redig
2019-05-17 15:21:23 -07:00
committed by GitHub
parent 4e2b64722f
commit 9000ae8de4
4 changed files with 71 additions and 170 deletions

View File

@ -15,8 +15,6 @@
package mmlogic
import (
"encoding/json"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -61,45 +59,23 @@ func (s *mmlogicService) QueryTickets(req *pb.QueryTicketsRequest, responseServe
ctx := responseServer.Context()
poolFilters := req.Pool.Filter
callback := func(tickets []*pb.Ticket) error {
err := responseServer.Send(&pb.QueryTicketsResponse{Ticket: tickets})
if err != nil {
logger.WithError(err).Error("Failed to send Redis response to grpc server")
return status.Errorf(codes.Aborted, err.Error())
}
return nil
}
pSize := s.cfg.GetInt("storage.page.size")
// Send requests to the storage service
idsToProperties, err := s.store.FilterTickets(ctx, poolFilters)
err := s.store.FilterTickets(ctx, poolFilters, pSize, callback)
if err != nil {
logger.WithError(err).Error("Failed to retrieve result from storage service.")
return status.Error(codes.Internal, err.Error())
}
page := []*pb.Ticket{}
// The ith entry when iterating through the idsToProperties map
mapIdx := 0
// The number of tickets in a paging response
pSize := s.cfg.GetInt("storage.page.size")
for id, property := range idsToProperties {
select {
case <-ctx.Done():
return nil
default:
propertyByte, err := json.Marshal(property)
if err != nil {
logger.WithError(err).Error("Failed to convert property map to JSON")
return status.Errorf(codes.Internal, err.Error())
}
page = append(page, &pb.Ticket{Id: id, Properties: string(propertyByte)})
mapIdx++
endPage := mapIdx%pSize == 0 || mapIdx == len(idsToProperties)-1
if endPage {
// Reaches page limit; Send a stream response then reset the page
err := responseServer.Send(&pb.QueryTicketsResponse{Ticket: page})
if err != nil {
logger.WithError(err).Error("Failed to send Redis response to grpc server")
return status.Errorf(codes.Aborted, err.Error())
}
page = []*pb.Ticket{}
}
}
}
return nil
}

View File

@ -47,7 +47,7 @@ type Service interface {
DeindexTicket(ctx context.Context, id string) error
// FilterTickets returns the Ticket ids for the Tickets meeting the specified filtering criteria.
FilterTickets(ctx context.Context, filters []*pb.Filter) (map[string]map[string]int64, error)
FilterTickets(ctx context.Context, filters []*pb.Filter, pageSize int, callback func([]*pb.Ticket) error) error
// Closes the connection to the underlying storage.
Close() error

View File

@ -367,15 +367,13 @@ func (rb *redisBackend) DeindexTicket(ctx context.Context, id string) error {
// "testplayer1": {"ranking" : 56, "loyalty_level": 4},
// "testplayer2": {"ranking" : 50, "loyalty_level": 3},
// }
func (rb *redisBackend) FilterTickets(ctx context.Context, filters []*pb.Filter) (map[string]map[string]int64, error) {
func (rb *redisBackend) FilterTickets(ctx context.Context, filters []*pb.Filter, pageSize int, callback func([]*pb.Ticket) error) error {
redisConn, err := rb.redisPool.GetContext(ctx)
if err != nil {
redisLogger.WithError(err).Error("Failed to get redis connection with context.")
}
defer handleConnectionClose(&redisConn)
// A map[attribute]map[playerID]value
attributeToTickets := make(map[string]map[string]int64)
// A set of playerIds that satisfies all filters
idSet := make([]string, 0)
@ -384,42 +382,75 @@ func (rb *redisBackend) FilterTickets(ctx context.Context, filters []*pb.Filter)
// Time Complexity O(logN + M), where N is the number of elements in the attribute set
// and M is the number of entries being returned.
// TODO: discuss if we need a LIMIT for # of queries being returned
idToValue, err := redis.Int64Map(redisConn.Do("ZRANGEBYSCORE", filter.Attribute, filter.Min, filter.Max, "WITHSCORES"))
idsInFilter, err := redis.Strings(redisConn.Do("ZRANGEBYSCORE", filter.Attribute, filter.Min, filter.Max))
if err != nil {
redisLogger.WithFields(logrus.Fields{
"Command": fmt.Sprintf("ZRANGEBYSCORE %s %f %f WITHSCORES", filter.Attribute, filter.Min, filter.Max),
}).WithError(err)
return nil, err
}
attributeToTickets[filter.Attribute] = idToValue
idsPerFilter := make([]string, 0)
for id := range idToValue {
idsPerFilter = append(idsPerFilter, id)
"Command": fmt.Sprintf("ZRANGEBYSCORE %s %f %f", filter.Attribute, filter.Min, filter.Max),
}).WithError(err).Error("Failed to lookup index.")
return status.Errorf(codes.Internal, "%v", err)
}
if i == 0 {
idSet = idsPerFilter
idSet = idsInFilter
} else {
idSet = set.Intersection(idSet, idsPerFilter)
idSet = set.Intersection(idSet, idsInFilter)
}
}
// Result is a mapping from ticket ids to attribute key-value pairs
results := make(map[string]map[string]int64)
for _, id := range idSet {
// A map from attribute names to attribute values per ticket
propertyPerTicket := make(map[string]int64)
for attr, tickets := range attributeToTickets {
attrVal := tickets[id]
propertyPerTicket[attr] = attrVal
// TODO: finish reworking this after the proto changes.
for _, page := range idsToPages(idSet, pageSize) {
ticketBytes, err := redis.ByteSlices(redisConn.Do("MGET", page...))
if err != nil {
redisLogger.WithFields(logrus.Fields{
"Command": fmt.Sprintf("MGET %v", page),
}).WithError(err).Error("Failed to lookup tickets.")
return status.Errorf(codes.Internal, "%v", err)
}
tickets := make([]*pb.Ticket, 0, len(page))
for i, b := range ticketBytes {
// Tickets may be deleted by the time we read it from redis.
if b != nil {
t := &pb.Ticket{}
err = proto.Unmarshal(b, t)
if err != nil {
redisLogger.WithFields(logrus.Fields{
"key": page[i],
}).WithError(err).Error("Failed to unmarshal ticket from redis.")
return status.Errorf(codes.Internal, "%v", err)
}
tickets = append(tickets, t)
}
}
err = callback(tickets)
if err != nil {
return status.Errorf(codes.Internal, "%v", err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
results[id] = propertyPerTicket
}
return results, nil
return nil
}
func idsToPages(ids []string, pageSize int) [][]interface{} {
result := make([][]interface{}, 0, len(ids)/pageSize+1)
for i := 0; i < len(ids); i += pageSize {
end := i + pageSize
if end > len(ids) {
end = len(ids)
}
page := make([]interface{}, end-i)
for i, id := range ids[i:end] {
page[i] = id
}
result = append(result, page)
}
return result
}
func handleConnectionClose(conn *redis.Conn) {

View File

@ -1,106 +0,0 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statestore
import (
"context"
"testing"
"time"
"github.com/alicebob/miniredis"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"open-match.dev/open-match/internal/pb"
testUtil "open-match.dev/open-match/internal/testing"
)
func TestRedisConnection(t *testing.T) {
assert := assert.New(t)
rb, closer := createRedis(t)
defer closer()
assert.NotNil(rb)
assert.Nil(closer())
}
func createRedis(t *testing.T) (Service, func() error) {
cfg := viper.New()
mredis, err := miniredis.Run()
if err != nil {
t.Fatalf("cannot create redis %s", err)
}
cfg.Set("redis.hostname", mredis.Host())
cfg.Set("redis.port", mredis.Port())
cfg.Set("redis.pool.maxIdle", 1000)
cfg.Set("redis.pool.idleTimeout", time.Second)
cfg.Set("redis.pool.maxActive", 1000)
rs, err := newRedis(cfg)
if err != nil {
t.Fatalf("cannot connect to fake redis %s", err)
}
return rs, func() error {
rbCloseErr := rs.Close()
mredis.Close()
return rbCloseErr
}
}
// TODO: Test paging logic
func TestFilterTickets(t *testing.T) {
assert := assert.New(t)
cfg := viper.New()
cfg.Set("storage.page.size", 1000)
rs, rsCloser := createRedis(t)
defer rsCloser()
// Inject test data into the fake redis server
ctx := context.Background()
rb, ok := rs.(*redisBackend)
assert.True(ok)
redisConn, err := rb.redisPool.GetContext(ctx)
assert.Nil(err)
redisConn.Do("ZADD", "level",
1, "alice",
10, "bob",
20, "charlie",
30, "donald",
40, "eddy",
)
redisConn.Do("ZADD", "attack",
1, "alice",
10, "bob",
20, "charlie",
30, "donald",
40, "eddy",
)
redisConn.Do("ZADD", "defense",
1, "alice",
10, "bob",
20, "charlie",
30, "donald",
40, "eddy",
)
ticketsData, err := rs.FilterTickets(ctx, []*pb.Filter{
testUtil.NewPbFilter("level", 0, 15),
testUtil.NewPbFilter("attack", 5, 25),
})
assert.Nil(err)
assert.Equal(map[string]map[string]int64{"bob": {"attack": 10, "level": 10}}, ticketsData)
}