Reorganize backend service and e2e test for incoming rest config logic (#458)

* Reorganize backend service and e2e test for incoming rest config logic
This commit is contained in:
yfei1
2019-05-31 12:57:50 -07:00
committed by GitHub
parent 80bcd9487f
commit f3f80a70bd
3 changed files with 100 additions and 92 deletions

View File

@ -77,13 +77,16 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
return status.Errorf(codes.InvalidArgument, "match function configuration needs to be provided")
}
ctx := stream.Context()
var grpcClient pb.MatchFunctionClient
// TODO: Uncomment when rest config is implemented
// var httpClient *http.Client
// var baseURL string
var err error
var c <-chan *pb.Match
switch (req.Config.Type).(type) {
// MatchFunction Hosted as a GRPC service
case *pb.FunctionConfig_Grpc:
client, err := s.getGRPCClient((req.Config.Type).(*pb.FunctionConfig_Grpc))
grpcClient, err = s.getGRPCClient((req.Config.Type).(*pb.FunctionConfig_Grpc))
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
@ -91,10 +94,6 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
}).Error("failed to establish grpc client connection to match function")
return status.Error(codes.InvalidArgument, "failed to connect to match function")
}
// Get the channel over which the generated match results will be sent.
c = s.matchesFromGrpcMMF(ctx, req.Profile, client)
// MatchFunction Hosted as a REST service
case *pb.FunctionConfig_Rest:
_, _, err := s.getHTTPClient((req.Config.Type).(*pb.FunctionConfig_Rest))
@ -106,18 +105,49 @@ func (s *backendService) FetchMatches(req *pb.FetchMatchesRequest, stream pb.Bac
return status.Error(codes.InvalidArgument, "failed to connect to match function")
}
return status.Error(codes.Unimplemented, "not implemented")
default:
logger.Error("unsupported function type provided")
return status.Error(codes.InvalidArgument, "provided match function type is not supported")
}
ctx := stream.Context()
matchChan := make(chan *pb.Match)
errChan := make(chan error)
go func(matchChan chan<- *pb.Match, errChan chan<- error) {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(matchChan)
close(errChan)
}()
for _, profile := range req.Profile {
wg.Add(1)
go func(profile *pb.MatchProfile) {
defer wg.Done()
switch (req.Config.Type).(type) {
case *pb.FunctionConfig_Grpc:
// Get the channel over which the generated match results will be sent.
matchesFromGrpcMMF(ctx, profile, grpcClient, matchChan, errChan)
case *pb.FunctionConfig_Rest:
// TODO: implement matchesFromHttpMMF function
// s.matchesFromHttpMMF(ctx, profile, httpClient, baseURL, matchChan, errChan)
errChan <- status.Error(codes.Unimplemented, "rest function config is unimplemented")
}
}(profile)
}
}(matchChan, errChan)
for {
select {
case <-ctx.Done():
return ctx.Err()
case match, ok := <-c:
case err := <-errChan:
return err
case match, ok := <-matchChan:
if !ok {
// Channel closed indicating that we are done processing matches.
return nil
@ -168,63 +198,32 @@ func (s *backendService) getHTTPClient(config *pb.FunctionConfig_Rest) (*http.Cl
// matchesFromGrpcMMF triggers execution of MMFs to fetch match results for each profile.
// These proposals are then sent to evaluator and the results are streamed back on the channel
// that this function returns to the caller.
func (s *backendService) matchesFromGrpcMMF(ctx context.Context, profiles []*pb.MatchProfile, client pb.MatchFunctionClient) <-chan *pb.Match {
c := make(chan *pb.Match)
func matchesFromGrpcMMF(ctx context.Context, profile *pb.MatchProfile, client pb.MatchFunctionClient, matchChan chan<- *pb.Match, errChan chan<- error) {
// TODO: This code calls user code and could hang. We need to add a deadline here
// and timeout gracefully to ensure that the ListMatches completes.
// TODO: Currently, a failure in running the MMF is silently ignored and does not
// fail the FetchMatches. This needs to be revisited to investigate whether the
// FetchMatches should fail or atleast hint that a part of MMF executions failed.
resp, err := client.Run(ctx, &pb.RunRequest{Profile: profile})
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"profile": profile,
}).Error("failed to run match function for profile")
errChan <- err
return
}
// Create a goroutine that will wait for completion of individual goroutines processing each
// match profile and then close the channel signaling completion of match results generation.
go func(c chan<- *pb.Match) {
var wg sync.WaitGroup
for _, profile := range profiles {
// Trigger a goroutine for each profile. This will call the MatchFunction to get
// proposed matches and will then pass these to evaluator to get the accepted
// match results that it will send on the output channel.
wg.Add(1)
go func(profile *pb.MatchProfile) {
defer wg.Done()
// TODO: This code calls user code and could hang. We need to add a deadline here
// and timeout gracefully to ensure that the ListMatches completes.
// TODO: Currently, a failure in running the MMF is silently ignored and does not
// fail the FetchMatches. This needs to be revisited to investigate whether the
// FetchMatches should fail or atleast hint that a part of MMF executions failed.
resp, err := client.Run(ctx, &pb.RunRequest{Profile: profile})
if err != nil {
logger.WithFields(logrus.Fields{
"error": err.Error(),
"profile": profile,
}).Error("failed to run match function for profile")
return
}
logger.WithFields(logrus.Fields{
"profile": profile,
"proposals": resp.Proposal,
}).Trace("proposals generated for match profile")
// TODO: The matches returned by the MatchFunction will be sent to the
// Evaluator to select results. Until the evaluator is implemented,
// we channel all matches as accepted results.
// evaluatorResp, err := s.evalClient.Evaluate(ctx, &pb.EvaluateRequest{Match: resp.Proposal})
// if err != nil {
// logger.WithFields(logrus.Fields{
// "error": err.Error(),
// "match": resp.Proposal,
// }).Error("failed to evaluate results given candidate matches")
// return
// }
for _, match := range resp.Proposal {
c <- match
}
}(profile)
}
// Wait for all the goroutines handling each profile to return and then close the channel.
wg.Wait()
close(c)
}(c)
return c
logger.WithFields(logrus.Fields{
"profile": profile,
"proposals": resp.Proposal,
}).Trace("proposals generated for match profile")
// TODO: The matches returned by the MatchFunction will be sent to the
// Evaluator to select results. Until the evaluator is implemented,
// we channel all matches as accepted results.
for _, match := range resp.Proposal {
matchChan <- match
}
}
// AssignTickets sets the specified Assignment on the Tickets for the Ticket

View File

@ -31,27 +31,6 @@ const (
mmfGRPCPortInt = 50511
)
// matchFunctionConfig returns a function config for a basic match function that
// can be used for testing E2E scenarios for Open Match.
func matchFunctionConfig() (*pb.FunctionConfig, func(), error) {
mfclose, err := serveMatchFunction()
if err != nil {
return nil, nil, err
}
mf := &pb.FunctionConfig{
Name: mmfName,
Type: &pb.FunctionConfig_Grpc{
Grpc: &pb.GrpcFunctionConfig{
Host: mmfHost,
Port: mmfGRPCPortInt,
},
},
}
return mf, mfclose, nil
}
// serveMatchFunction creates a GRPC server and starts it to server the match function forever.
func serveMatchFunction() (func(), error) {
cfg := viper.New()

View File

@ -38,6 +38,11 @@ const (
map2attribute = "map2"
)
type testProfile struct {
name string
pools []*pb.Pool
}
func TestMinimatchStartup(t *testing.T) {
assert := assert.New(t)
@ -105,10 +110,7 @@ func TestMinimatchStartup(t *testing.T) {
// Test profiles being tested for. Note that each profile embeds two pools - and
// the current MMF returns a match per pool in the profile - so each profile should
// output two matches that are comprised of tickets belonging to that pool.
testProfiles := []struct {
name string
pools []*pb.Pool
}{
testProfiles := []testProfile{
{name: "", pools: []*pb.Pool{testPools[map1BeginnerPool], testPools[map1AdvancedPool]}},
{name: "", pools: []*pb.Pool{testPools[map2BeginnerPool], testPools[map2AdvancedPool]}},
}
@ -176,11 +178,39 @@ func TestMinimatchStartup(t *testing.T) {
assert.Equal(poolTickets[pool.Name], want)
}
cfgs := []*pb.FunctionConfig{
{
Name: mmfName,
Type: &pb.FunctionConfig_Grpc{
Grpc: &pb.GrpcFunctionConfig{
Host: mmfHost,
Port: mmfGRPCPortInt,
},
},
},
// TODO: Uncomment when rest config logic is implemented
// &pb.FunctionConfig{
// Name: mmfName,
// Type: &pb.FunctionConfig_Rest{
// Rest: &pb.RestFunctionConfig{
// Host: mmfHost,
// Port: mmfGRPCPortInt,
// },
// },
// },
}
for _, cfg := range cfgs {
validateFetchMatchesResult(assert, poolTickets, testProfiles, mm, cfg)
}
}
func validateFetchMatchesResult(assert *assert.Assertions, poolTickets map[string][]string, testProfiles []testProfile, mm *Server, mf *pb.FunctionConfig) {
be, err := mm.GetBackendClient()
assert.Nil(err)
assert.NotNil(be)
mf, mfclose, err := matchFunctionConfig()
mfclose, err := serveMatchFunction()
defer mfclose()
assert.Nil(err)