Stream proposals from mmf to synchronizer (#1077)

This improves efficiency for overall system latency, and sets up for better mmf error handling.

The overall structure of the fetch matches call has been reworked. The different go routines now set an explicit err variable. So once we have FetchSummary, we can just set the mmf err variable on it. Synchronizer calls which err will always result in an error here (as it's relatively fatal), while mmf and evaluator errors will be passed gently to the client.

One thing this code isn't doing anymore is checking if an mmf returns a match with no tickets. This seems fine to me, but willing to discuss if anyone disagrees.

Deleted the tests for the following reasons:

TestDoFetchMatchesInChannel didn't actually test fetching matches, it only tested creating a client. Since callMmf now both creates the client and makes the call, this code now blocks actually trying to make a connection. I'm not worried about having full branch test coverage on err statements...
TestDoFetchMatchesFilterChannel tested merging of mmf runs. Since there's only one mmf run now, it's no longer necessary.
This commit is contained in:
Scott Redig
2020-01-29 14:44:15 -08:00
committed by GitHub
parent 338a03cce5
commit 0b8425184b
3 changed files with 155 additions and 329 deletions

View File

@ -27,7 +27,7 @@ func BindService(p *rpc.ServerParams, cfg config.View) error {
service := &backendService{
synchronizer: newSynchronizerClient(cfg),
store: statestore.New(cfg),
mmfClients: rpc.NewClientCache(cfg),
cc: rpc.NewClientCache(cfg),
}
p.AddHealthCheckFunc(service.store.HealthCheck)

View File

@ -40,12 +40,7 @@ import (
type backendService struct {
synchronizer *synchronizerClient
store statestore.Service
mmfClients *rpc.ClientCache
}
type mmfResult struct {
matches []*pb.Match
err error
cc *rpc.ClientCache
}
var (
@ -75,198 +70,194 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
return err
}
// Send errors from the running go routines back to the FetchMatches go
// routine. Must have size equal to number of senders so that if FetchMatches
// returns an error, additional errors don't block the go routine from
// finishing.
errors := make(chan error, 2)
mmfCtx, cancelMmfs := context.WithCancel(stream.Context())
startMmfs := func() error {
resultChan := make(chan mmfResult, 1)
// Closed when mmfs should start.
startMmfs := make(chan struct{})
proposals := make(chan *pb.Match)
wg := sync.WaitGroup{}
wg.Add(3)
err := doFetchMatchesReceiveMmfResult(mmfCtx, s.mmfClients, req, resultChan)
if err != nil {
// TODO: Log but continue case where mmfs were canceled once fully
// streaming.
return err
var synchronizeSendErr error
go func() {
defer wg.Done()
synchronizeSendErr = synchronizeSend(mmfCtx, proposals, syncStream)
}()
var synchronizeRecvErr error
go func() {
defer wg.Done()
synchronizeRecvErr = synchronizeRecv(syncStream, stream, startMmfs, cancelMmfs)
cancelMmfs()
}()
var mmfErr error
go func() {
defer wg.Done()
select {
case <-mmfCtx.Done():
mmfErr = fmt.Errorf("Mmf was never started")
return
case <-startMmfs:
}
proposals, err := doFetchMatchesValidateProposals(mmfCtx, resultChan, 1)
if err != nil {
// TODO: Log but continue case where mmfs were canceled once fully
// streaming.
return err
}
mmfErr = callMmf(mmfCtx, s.cc, req, proposals)
}()
sendProposals:
for _, p := range proposals {
select {
case <-mmfCtx.Done():
logger.Warning("proposals from mmfs received too late to be sent to synchronizer")
wg.Wait()
// TODO: Send mmf error in FetchSummary instead of erroring call.
if synchronizeSendErr != nil || synchronizeRecvErr != nil || mmfErr != nil {
logger.WithFields(logrus.Fields{
"synchronizeSendErr": synchronizeSendErr,
"synchronizeRecvErr": synchronizeRecvErr,
"mmfErr": mmfErr,
}).Error("error(s) in FetchMatches call.")
return fmt.Errorf(
"Error(s) in FetchMatches call. synchronizeSendErr=[%s], synchronizeRecvErr=[%s], mmfErr=[%s]",
synchronizeSendErr,
synchronizeRecvErr,
mmfErr,
)
}
return nil
}
func synchronizeSend(ctx context.Context, proposals <-chan *pb.Match, syncStream synchronizerStream) error {
sendProposals:
for {
select {
case <-ctx.Done():
break sendProposals
case p, ok := <-proposals:
if !ok {
break sendProposals
default:
}
telemetry.RecordUnitMeasurement(stream.Context(), mMatchesSentToEvaluation)
err = syncStream.Send(&ipb.SynchronizeRequest{Proposal: p})
telemetry.RecordUnitMeasurement(ctx, mMatchesSentToEvaluation)
err := syncStream.Send(&ipb.SynchronizeRequest{Proposal: p})
if err != nil {
return fmt.Errorf("error sending proposal to synchronizer: %w", err)
}
}
err = syncStream.CloseSend()
if err != nil {
return fmt.Errorf("error closing send stream of proposals to synchronizer: %w", err)
}
return nil
}
go func() {
var startMmfsOnce sync.Once
defer func() {
startMmfsOnce.Do(func() {
errors <- fmt.Errorf("MMFS were never started")
})
}()
for {
resp, err := syncStream.Recv()
if err == io.EOF {
errors <- nil
return
}
if err != nil {
errors <- fmt.Errorf("error receiving match from synchronizer: %w", err)
return
}
if resp.StartMmfs {
go startMmfsOnce.Do(func() {
errors <- startMmfs()
})
}
if resp.CancelMmfs {
cancelMmfs()
}
if resp.Match != nil {
telemetry.RecordUnitMeasurement(stream.Context(), mMatchesFetched)
err = stream.Send(&pb.FetchMatchesResponse{Match: resp.Match})
if err != nil {
errors <- fmt.Errorf("error sending match to caller of backend: %w", err)
return
}
}
}
}()
for i := 0; i < 2; i++ {
err := <-errors
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
}).Error("error in FetchMatches call.")
return err
}
err := syncStream.CloseSend()
if err != nil {
return fmt.Errorf("error closing send stream of proposals to synchronizer: %w", err)
}
return nil
}
func doFetchMatchesReceiveMmfResult(ctx context.Context, mmfClients *rpc.ClientCache, req *pb.FetchMatchesRequest, resultChan chan<- mmfResult) error {
var grpcClient pb.MatchFunctionClient
var httpClient *http.Client
var baseURL string
var err error
func synchronizeRecv(syncStream synchronizerStream, stream pb.BackendService_FetchMatchesServer, startMmfs chan<- struct{}, cancelMmfs context.CancelFunc) error {
var startMmfsOnce sync.Once
configType := req.GetConfig().GetType()
for {
resp, err := syncStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return fmt.Errorf("error receiving match from synchronizer: %w", err)
}
if resp.StartMmfs {
go startMmfsOnce.Do(func() {
close(startMmfs)
})
}
if resp.CancelMmfs {
cancelMmfs()
}
if resp.Match != nil {
telemetry.RecordUnitMeasurement(stream.Context(), mMatchesFetched)
err = stream.Send(&pb.FetchMatchesResponse{Match: resp.Match})
if err != nil {
return fmt.Errorf("error sending match to caller of backend: %w", err)
}
}
}
}
// callMmf triggers execution of MMFs to fetch match proposals.
func callMmf(ctx context.Context, cc *rpc.ClientCache, req *pb.FetchMatchesRequest, proposals chan<- *pb.Match) error {
defer close(proposals)
address := fmt.Sprintf("%s:%d", req.GetConfig().GetHost(), req.GetConfig().GetPort())
switch configType {
// MatchFunction Hosted as a GRPC service
switch req.GetConfig().GetType() {
case pb.FunctionConfig_GRPC:
var conn *grpc.ClientConn
conn, err = mmfClients.GetGRPC(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": req.GetConfig(),
}).Error("failed to establish grpc client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
}
grpcClient = pb.NewMatchFunctionClient(conn)
// MatchFunction Hosted as a REST service
return callGrpcMmf(ctx, cc, req.GetProfile(), address, proposals)
case pb.FunctionConfig_REST:
httpClient, baseURL, err = mmfClients.GetHTTP(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": req.GetConfig(),
}).Error("failed to establish rest client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
}
return callHTTPMmf(ctx, cc, req.GetProfile(), address, proposals)
default:
return status.Error(codes.InvalidArgument, "provided match function type is not supported")
}
}
go func(profile *pb.MatchProfile) {
// Get the match results that will be sent.
// TODO: The matches returned by the MatchFunction will be sent to the
// Evaluator to select results. Until the evaluator is implemented,
// we channel all matches as accepted results.
switch configType {
case pb.FunctionConfig_GRPC:
matches, err := matchesFromGRPCMMF(ctx, profile, grpcClient)
resultChan <- mmfResult{matches, err}
case pb.FunctionConfig_REST:
matches, err := matchesFromHTTPMMF(ctx, profile, httpClient, baseURL)
resultChan <- mmfResult{matches, err}
func callGrpcMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProfile, address string, proposals chan<- *pb.Match) error {
var conn *grpc.ClientConn
conn, err := cc.GetGRPC(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": address,
}).Error("failed to establish grpc client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
}
client := pb.NewMatchFunctionClient(conn)
stream, err := client.Run(ctx, &pb.RunRequest{Profile: profile})
if err != nil {
logger.WithError(err).Error("failed to run match function for profile")
return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
}(req.GetProfile())
if err != nil {
logger.Errorf("%v.Run() error, %v\n", client, err)
return err
}
select {
case proposals <- resp.GetProposal():
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
func doFetchMatchesValidateProposals(ctx context.Context, resultChan <-chan mmfResult, channelSize int) ([]*pb.Match, error) {
proposals := []*pb.Match{}
for i := 0; i < channelSize; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultChan:
// Check if mmf responds with any errors
if result.err != nil {
return nil, result.err
}
// Check if mmf returns a match with no tickets in it
for _, match := range result.matches {
if len(match.GetTickets()) == 0 {
return nil, status.Errorf(codes.FailedPrecondition, "match %s does not have associated tickets.", match.GetMatchId())
}
}
proposals = append(proposals, result.matches...)
}
func callHTTPMmf(ctx context.Context, cc *rpc.ClientCache, profile *pb.MatchProfile, address string, proposals chan<- *pb.Match) error {
client, baseURL, err := cc.GetHTTP(address)
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"function": address,
}).Error("failed to establish rest client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
}
return proposals, nil
}
func matchesFromHTTPMMF(ctx context.Context, profile *pb.MatchProfile, client *http.Client, baseURL string) ([]*pb.Match, error) {
var m jsonpb.Marshaler
strReq, err := m.MarshalToString(&pb.RunRequest{Profile: profile})
if err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "failed to marshal profile pb to string for profile %s: %s", profile.GetName(), err.Error())
return status.Errorf(codes.FailedPrecondition, "failed to marshal profile pb to string for profile %s: %s", profile.GetName(), err.Error())
}
req, err := http.NewRequest("POST", baseURL+"/v1/matchfunction:run", strings.NewReader(strReq))
if err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "failed to create mmf http request for profile %s: %s", profile.GetName(), err.Error())
return status.Errorf(codes.FailedPrecondition, "failed to create mmf http request for profile %s: %s", profile.GetName(), err.Error())
}
resp, err := client.Do(req.WithContext(ctx))
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get response from mmf run for proile %s: %s", profile.Name, err.Error())
return status.Errorf(codes.Internal, "failed to get response from mmf run for proile %s: %s", profile.Name, err.Error())
}
defer func() {
err = resp.Body.Close()
@ -276,7 +267,6 @@ func matchesFromHTTPMMF(ctx context.Context, profile *pb.MatchProfile, client *h
}()
dec := json.NewDecoder(resp.Body)
proposals := make([]*pb.Match, 0)
for {
var item struct {
Result json.RawMessage `json:"result"`
@ -288,47 +278,23 @@ func matchesFromHTTPMMF(ctx context.Context, profile *pb.MatchProfile, client *h
break
}
if err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to read response from HTTP JSON stream: %s", err.Error())
return status.Errorf(codes.Unavailable, "failed to read response from HTTP JSON stream: %s", err.Error())
}
if len(item.Error) != 0 {
return nil, status.Errorf(codes.Unavailable, "failed to execute matchfunction.Run: %v", item.Error)
return status.Errorf(codes.Unavailable, "failed to execute matchfunction.Run: %v", item.Error)
}
resp := &pb.RunResponse{}
if err := jsonpb.UnmarshalString(string(item.Result), resp); err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to execute json.Unmarshal(%s, &resp): %v", item.Result, err)
return status.Errorf(codes.Unavailable, "failed to execute json.Unmarshal(%s, &resp): %v", item.Result, err)
}
select {
case proposals <- resp.GetProposal():
case <-ctx.Done():
return ctx.Err()
}
proposals = append(proposals, resp.GetProposal())
}
return proposals, nil
}
// matchesFromGRPCMMF triggers execution of MMFs to fetch match results for each profile.
// These proposals are then sent to evaluator and the results are streamed back on the channel
// that this function returns to the caller.
func matchesFromGRPCMMF(ctx context.Context, profile *pb.MatchProfile, client pb.MatchFunctionClient) ([]*pb.Match, error) {
// TODO: This code calls user code and could hang. We need to add a deadline here
// and timeout gracefully to ensure that the ListMatches completes.
stream, err := client.Run(ctx, &pb.RunRequest{Profile: profile})
if err != nil {
logger.WithError(err).Error("failed to run match function for profile")
return nil, err
}
proposals := make([]*pb.Match, 0)
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
logger.Errorf("%v.Run() error, %v\n", client, err)
return nil, err
}
proposals = append(proposals, resp.GetProposal())
}
return proposals, nil
return nil
}
// AssignTickets overwrites the Assignment field of the input TicketIds.

View File

@ -17,157 +17,17 @@ package backend
import (
"context"
"testing"
"time"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"open-match.dev/open-match/internal/config"
"open-match.dev/open-match/internal/rpc"
"open-match.dev/open-match/internal/statestore"
statestoreTesting "open-match.dev/open-match/internal/statestore/testing"
utilTesting "open-match.dev/open-match/internal/util/testing"
"open-match.dev/open-match/pkg/pb"
certgenTesting "open-match.dev/open-match/tools/certgen/testing"
)
func TestDoFetchMatchesInChannel(t *testing.T) {
insecureCfg := viper.New()
secureCfg := viper.New()
pub, _, err := certgenTesting.CreateCertificateAndPrivateKeyForTesting([]string{})
if err != nil {
t.Fatalf("cannot create TLS keys: %s", err)
}
secureCfg.Set("api.tls.rootCertificateFile", pub)
restFuncCfg := &pb.FetchMatchesRequest{
Config: &pb.FunctionConfig{Host: "om-test", Port: 54321, Type: pb.FunctionConfig_REST},
Profile: &pb.MatchProfile{Name: "1"},
}
grpcFuncCfg := &pb.FetchMatchesRequest{
Config: &pb.FunctionConfig{Host: "om-test", Port: 54321, Type: pb.FunctionConfig_GRPC},
Profile: &pb.MatchProfile{Name: "1"},
}
unsupporteFuncCfg := &pb.FetchMatchesRequest{
Config: &pb.FunctionConfig{Host: "om-test", Port: 54321, Type: 3},
Profile: &pb.MatchProfile{Name: "1"},
}
tests := []struct {
description string
req *pb.FetchMatchesRequest
wantErr error
cfg config.View
}{
{
"trusted certificate is required when requesting a secure http client",
restFuncCfg,
status.Error(codes.InvalidArgument, "failed to connect to match function"),
secureCfg,
},
{
"trusted certificate is required when requesting a secure grpc client",
grpcFuncCfg,
status.Error(codes.InvalidArgument, "failed to connect to match function"),
secureCfg,
},
{
"the mmfResult channel received data successfully under the insecure mode with rest config",
restFuncCfg,
nil,
insecureCfg,
},
{
"the mmfResult channel received data successfully under the insecure mode with grpc config",
grpcFuncCfg,
nil,
insecureCfg,
},
{
"one of the rest/grpc config is required to process the request",
unsupporteFuncCfg,
status.Error(codes.InvalidArgument, "provided match function type is not supported"),
insecureCfg,
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ctx := utilTesting.NewContext(t)
cc := rpc.NewClientCache(test.cfg)
resultChan := make(chan mmfResult, 1)
err := doFetchMatchesReceiveMmfResult(ctx, cc, test.req, resultChan)
assert.Equal(t, test.wantErr, err)
})
}
}
func TestDoFetchMatchesFilterChannel(t *testing.T) {
tests := []struct {
description string
preAction func(chan mmfResult, context.CancelFunc)
wantMatches []*pb.Match
wantCode codes.Code
}{
{
description: "test the filter can exit the for loop when context was canceled",
preAction: func(mmfChan chan mmfResult, cancel context.CancelFunc) {
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
},
wantMatches: nil,
wantCode: codes.Unknown,
},
{
description: "test the filter can return an error when one of the mmfResult contains an error",
preAction: func(mmfChan chan mmfResult, cancel context.CancelFunc) {
mmfChan <- mmfResult{matches: []*pb.Match{{MatchId: "1", Tickets: []*pb.Ticket{{Id: "123"}}}}, err: nil}
mmfChan <- mmfResult{matches: nil, err: status.Error(codes.Unknown, "some error")}
},
wantMatches: nil,
wantCode: codes.Unknown,
},
{
description: "test the filter can return an error when one of the mmf calls return match with empty tickets",
preAction: func(mmfChan chan mmfResult, cancel context.CancelFunc) {
mmfChan <- mmfResult{matches: []*pb.Match{{MatchId: "1"}}, err: nil}
mmfChan <- mmfResult{matches: []*pb.Match{{MatchId: "2"}}, err: nil}
},
wantMatches: []*pb.Match{{MatchId: "1"}, {MatchId: "2"}},
wantCode: codes.FailedPrecondition,
},
{
description: "test the filter can return proposals when all mmfResults are valid",
preAction: func(mmfChan chan mmfResult, cancel context.CancelFunc) {
mmfChan <- mmfResult{matches: []*pb.Match{{MatchId: "1", Tickets: []*pb.Ticket{{Id: "123"}}}}, err: nil}
mmfChan <- mmfResult{matches: []*pb.Match{{MatchId: "2", Tickets: []*pb.Ticket{{Id: "321"}}}}, err: nil}
},
wantMatches: []*pb.Match{{MatchId: "1", Tickets: []*pb.Ticket{{Id: "123"}}}, {MatchId: "2", Tickets: []*pb.Ticket{{Id: "321"}}}},
wantCode: codes.OK,
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ctx := utilTesting.NewContext(t)
resultChan := make(chan mmfResult, 2)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
test.preAction(resultChan, cancel)
matches, err := doFetchMatchesValidateProposals(ctx, resultChan, 2)
for _, match := range matches {
assert.Contains(t, test.wantMatches, match)
}
assert.Equal(t, test.wantCode, status.Convert(err).Code())
})
}
}
func TestDoAssignTickets(t *testing.T) {
fakeProperty := "test-property"
fakeTickets := []*pb.Ticket{