Fix metrics flushing, add OC agent, and refactor multi-closing. (#648)

This commit is contained in:
Jeremy Edwards
2019-07-15 11:39:04 -07:00
committed by GitHub
parent 880e340859
commit e28fe42f3b
19 changed files with 263 additions and 118 deletions

1
go.mod
View File

@ -19,6 +19,7 @@ go 1.12
require (
cloud.google.com/go v0.40.0
contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/ocagent v0.5.0
contrib.go.opencensus.io/exporter/prometheus v0.1.0
contrib.go.opencensus.io/exporter/stackdriver v0.12.2
contrib.go.opencensus.io/exporter/zipkin v0.1.1

3
go.sum
View File

@ -5,6 +5,8 @@ cloud.google.com/go v0.40.0 h1:FjSY7bOj+WzJe6TZRVtXI2b9kAYvtNg4lMbcH2+MUkk=
cloud.google.com/go v0.40.0/go.mod h1:Tk58MuI9rbLMKlAjeO/bDnteAx7tX2gJIXw4T5Jwlro=
contrib.go.opencensus.io/exporter/jaeger v0.1.0 h1:WNc9HbA38xEQmsI40Tjd/MNU/g8byN2Of7lwIjv0Jdc=
contrib.go.opencensus.io/exporter/jaeger v0.1.0/go.mod h1:VYianECmuFPwU37O699Vc1GOcy+y8kOsfaxHRImmjbA=
contrib.go.opencensus.io/exporter/ocagent v0.5.0 h1:TKXjQSRS0/cCDrP7KvkgU6SmILtF/yV2TOs/02K/WZQ=
contrib.go.opencensus.io/exporter/ocagent v0.5.0/go.mod h1:ImxhfLRpxoYiSq891pBrLVhN+qmP8BTVvdH2YLs7Gl0=
contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE5H/ukPWBRo314xiDvg=
contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A=
contrib.go.opencensus.io/exporter/stackdriver v0.12.2 h1:jU1p9F07ASK11wYgSTPKtFlTvTtCDj6R1d3nRt0ZHDE=
@ -114,6 +116,7 @@ github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:Fecb
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.2 h1:S+ef0492XaIknb8LMjcwgW2i3cNTzDYMmDrOThOJNWc=
github.com/grpc-ecosystem/grpc-gateway v1.9.2/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=

View File

@ -30,6 +30,9 @@ openmatch:
enabled: false
endpoint: "/zipkin"
reporterEndpoint: "zipkin"
opencensusAgent:
enabled: false
agentEndpoint: "om-opencensus-agent:55678"
reportingPeriod: "5s"
synchronizer:
install: true
@ -192,6 +195,9 @@ openmatch:
proposalCollectionIntervalMs: 2000ms
monitoring:
opencensusAgent:
enable: "{{.Values.openmatch.monitoring.opencensusAgent.enabled}}"
agentEndpoint: "{{.Values.openmatch.monitoring.opencensusAgent.agentEndpoint}}"
jaeger:
enable: "{{.Values.openmatch.monitoring.jaeger.enabled}}"
agentEndpoint: "{{.Values.openmatch.monitoring.jaeger.agentEndpoint}}"

View File

@ -51,7 +51,8 @@ func RunApplication() {
func serve(cfg config.View) {
mux := &http.ServeMux{}
monitoring.Setup(mux, cfg)
closer := monitoring.Setup(mux, cfg)
defer closer()
port := cfg.GetInt("api.swaggerui.httpport")
baseDir, err := os.Getwd()
if err != nil {

View File

@ -21,16 +21,9 @@ import (
"open-match.dev/open-match/internal/config"
)
var (
jaegerLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "monitoring.jaeger",
})
)
func bindJaeger(cfg config.View) {
if !cfg.GetBool("monitoring.jaeger.enable") {
jaegerLogger.Info("Jaeger Tracing: Disabled")
logger.Info("Jaeger Tracing: Disabled")
return
}
@ -43,7 +36,7 @@ func bindJaeger(cfg config.View) {
ServiceName: "open_match",
})
if err != nil {
jaegerLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"error": err,
"agentEndpoint": agentEndpointURI,
"collectorEndpoint": collectorEndpointURI,
@ -51,10 +44,11 @@ func bindJaeger(cfg config.View) {
"Failed to create the Jaeger exporter: %v", err)
}
jaegerLogger.WithFields(logrus.Fields{
// And now finally register it as a Trace Exporter
trace.RegisterExporter(je)
logger.WithFields(logrus.Fields{
"agentEndpoint": agentEndpointURI,
"collectorEndpoint": collectorEndpointURI,
}).Info("Jaeger Tracing: ENABLED")
// And now finally register it as a Trace Exporter
trace.RegisterExporter(je)
}

View File

@ -0,0 +1,46 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package monitoring
import (
"contrib.go.opencensus.io/exporter/ocagent"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
"open-match.dev/open-match/internal/config"
)
func bindOpenCensusAgent(cfg config.View) func() error {
if !cfg.GetBool("monitoring.opencensusAgent.enable") {
logger.Info("OpenCensus Agent: Disabled")
return func() error { return nil }
}
agentEndpoint := cfg.GetString("monitoring.opencensusAgent.agentEndpoint")
oce, err := ocagent.NewExporter(ocagent.WithAddress(agentEndpoint), ocagent.WithInsecure(), ocagent.WithServiceName("open-match"))
if err != nil {
logger.WithError(err).Fatalf("Failed to create a new ocagent exporter")
}
trace.RegisterExporter(oce)
view.RegisterExporter(oce)
logger.WithFields(logrus.Fields{
"agentEndpoint": agentEndpoint,
}).Info("OpenCensus Agent: ENABLED")
// Before the program stops, please remember to stop the exporter.
return oce.Stop
}

View File

@ -17,7 +17,6 @@ package monitoring
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"net/http"
"sync/atomic"
)
@ -31,13 +30,6 @@ const (
healthStateUnhealthy = int32(2)
)
var (
probeLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "monitoring.probe",
})
)
type statefulProbe struct {
healthState *int32
probes []func(context.Context) error
@ -54,9 +46,9 @@ func (sp *statefulProbe) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err != nil {
old := atomic.SwapInt32(sp.healthState, healthStateUnhealthy)
if old == healthStateUnhealthy {
probeLogger.WithError(err).Warningf("%s health check continues to fail. The server is at risk of termination.", HealthCheckEndpoint)
logger.WithError(err).Warningf("%s health check continues to fail. The server is at risk of termination.", HealthCheckEndpoint)
} else {
probeLogger.WithError(err).Warningf("%s health check failed. The server will terminate if this continues to happen.", HealthCheckEndpoint)
logger.WithError(err).Warningf("%s health check failed. The server will terminate if this continues to happen.", HealthCheckEndpoint)
}
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
@ -64,9 +56,9 @@ func (sp *statefulProbe) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
old := atomic.SwapInt32(sp.healthState, healthStateHealthy)
if old == healthStateUnhealthy {
probeLogger.Infof("%s is healthy again.", HealthCheckEndpoint)
logger.Infof("%s is healthy again.", HealthCheckEndpoint)
} else if old == healthStateFirstProbe {
probeLogger.Infof("%s is reporting healthy, .", HealthCheckEndpoint)
logger.Infof("%s is reporting healthy.", HealthCheckEndpoint)
}
}
w.WriteHeader(http.StatusOK)

View File

@ -30,16 +30,9 @@ const (
ConfigNameEnableMetrics = "monitoring.prometheus.enable"
)
var (
prometheusLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "monitoring.prometheus",
})
)
func bindPrometheus(mux *http.ServeMux, cfg config.View) {
if !cfg.GetBool("monitoring.prometheus.enable") {
prometheusLogger.Info("Prometheus Metrics: Disabled")
logger.Info("Prometheus Metrics: Disabled")
return
}
@ -54,7 +47,7 @@ func bindPrometheus(mux *http.ServeMux, cfg config.View) {
Registry: registry,
})
if err != nil {
prometheusLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"error": err,
"endpoint": endpoint,
}).Fatal(
@ -64,8 +57,9 @@ func bindPrometheus(mux *http.ServeMux, cfg config.View) {
// Register the Prometheus exporters as a stats exporter.
view.RegisterExporter(promExporter)
prometheusLogger.WithFields(logrus.Fields{
mux.Handle(endpoint, promExporter)
logger.WithFields(logrus.Fields{
"endpoint": endpoint,
}).Info("Prometheus Metrics: ENABLED")
mux.Handle(endpoint, promExporter)
}

View File

@ -23,21 +23,23 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"open-match.dev/open-match/internal/config"
"open-match.dev/open-match/internal/util"
)
var (
publicLogger = logrus.WithFields(logrus.Fields{
logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "monitoring",
})
)
// Setup configures the monitoring for the server.
func Setup(mux *http.ServeMux, cfg config.View) {
func Setup(mux *http.ServeMux, cfg config.View) func() {
mc := util.NewMultiClose()
periodString := cfg.GetString("monitoring.reportingPeriod")
reportingPeriod, err := time.ParseDuration(periodString)
if err != nil {
publicLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"error": err,
"reportingPeriod": periodString,
}).Info("Failed to parse monitoring.reportingPeriod, defaulting to 10s")
@ -46,16 +48,18 @@ func Setup(mux *http.ServeMux, cfg config.View) {
bindJaeger(cfg)
bindPrometheus(mux, cfg)
bindStackDriver(cfg)
mc.AddCloseFunc(bindStackDriver(cfg))
mc.AddCloseWithErrorFunc(bindOpenCensusAgent(cfg))
bindZipkin(cfg)
bindZpages(mux, cfg)
// Change the frequency of updates to the metrics endpoint
view.SetReportingPeriod(reportingPeriod)
publicLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"reportingPeriod": reportingPeriod,
}).Info("Monitoring has been configured.")
return mc.Close
}
// Counter creates a counter metric.
@ -85,7 +89,7 @@ func counterView(s *stats.Int64Measure) *view.View {
}
err := view.Register(v)
if err != nil {
publicLogger.WithError(err).Infof("cannot register view for metric: %s, it will not be reported", s.Name())
logger.WithError(err).Infof("cannot register view for metric: %s, it will not be reported", s.Name())
}
return v
}

View File

@ -22,17 +22,10 @@ import (
"open-match.dev/open-match/internal/config"
)
var (
stackdriverLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "monitoring.stackdriver",
})
)
func bindStackDriver(cfg config.View) {
func bindStackDriver(cfg config.View) func() {
if !cfg.GetBool("monitoring.stackdriver.enable") {
stackdriverLogger.Info("StackDriver Metrics: Disabled")
return
logger.Info("StackDriver Metrics: Disabled")
return func() {}
}
gcpProjectID := cfg.GetString("monitoring.stackdriver.gcpProjectId")
metricPrefix := cfg.GetString("monitoring.stackdriver.metricPrefix")
@ -42,14 +35,12 @@ func bindStackDriver(cfg config.View) {
MetricPrefix: metricPrefix,
})
if err != nil {
stackdriverLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"error": err,
"gcpProjectID": gcpProjectID,
"metricPrefix": metricPrefix,
}).Fatal("Failed to initialize OpenCensus exporter to Stack Driver")
}
// It is imperative to invoke flush before your main function exits
defer sd.Flush()
// Register it as a metrics exporter
view.RegisterExporter(sd)
@ -57,8 +48,11 @@ func bindStackDriver(cfg config.View) {
// Register it as a trace exporter
trace.RegisterExporter(sd)
stackdriverLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"gcpProjectID": gcpProjectID,
"metricPrefix": metricPrefix,
}).Info("StackDriver Metrics: ENABLED")
// It is imperative to invoke flush before your main function exits
return sd.Flush
}

View File

@ -26,16 +26,9 @@ import (
"open-match.dev/open-match/internal/config"
)
var (
zipkinLogger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "monitoring.zipkin",
})
)
func bindZipkin(cfg config.View) {
if !cfg.GetBool("monitoring.zipkin.enable") {
zipkinLogger.Info("Zipkin Tracing: Disabled")
logger.Info("Zipkin Tracing: Disabled")
return
}
zipkinEndpoint := cfg.GetString("monitoring.zipkin.endpoint")
@ -43,7 +36,7 @@ func bindZipkin(cfg config.View) {
// 1. Configure exporter to export traces to Zipkin.
localEndpoint, err := openzipkin.NewEndpoint("open_match", zipkinEndpoint)
if err != nil {
zipkinLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"error": err,
"zipkinEndpoint": zipkinEndpoint,
"zipkinReporterEndpoint": zipkinReporterEndpoint,
@ -57,7 +50,7 @@ func bindZipkin(cfg config.View) {
// TODO: Provide a basic configuration for Zipkin trace samples.
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
zipkinLogger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"zipkinEndpoint": zipkinEndpoint,
"zipkinReporterEndpoint": zipkinReporterEndpoint,
}).Info("Zipkin Tracing: ENABLED")

View File

@ -15,14 +15,22 @@
package monitoring
import (
"go.opencensus.io/zpages"
"net/http"
"github.com/sirupsen/logrus"
"go.opencensus.io/zpages"
"open-match.dev/open-match/internal/config"
)
func bindZpages(mux *http.ServeMux, cfg config.View) {
if !cfg.GetBool("monitoring.zpages.enable") {
logger.Info("zPages: Disabled")
return
}
zpages.Handle(mux, "/debug")
endpoint := "/debug"
zpages.Handle(mux, endpoint)
logger.WithFields(logrus.Fields{
"endpoint": endpoint,
}).Info("zPages: ENABLED")
}

View File

@ -76,6 +76,7 @@ type ServerParams struct {
enableRPCLogging bool
enableMetrics bool
closer func()
}
// NewServerParamsFromConfig returns server Params initialized from the configuration file.
@ -131,7 +132,7 @@ func NewServerParamsFromConfig(cfg config.View, prefix string) (*ServerParams, e
p.enableRPCLogging = cfg.GetBool(configNameEnableRPCLogging)
// TODO: This isn't ideal since monitoring requires config for it to be initialized.
// This forces us to initialize readiness probes earlier than necessary.
monitoring.Setup(p.ServeMux, cfg)
p.closer = monitoring.Setup(p.ServeMux, cfg)
return p, nil
}
@ -193,6 +194,7 @@ func (p *ServerParams) invalidate() {
// All HTTP traffic is served from a common http.ServeMux.
type Server struct {
serverWithProxy grpcServerWithProxy
closer func()
}
// grpcServerWithProxy this will go away when insecure.go and tls.go are merged into the same server.
@ -208,12 +210,16 @@ func (s *Server) Start(p *ServerParams) (func(), error) {
} else {
s.serverWithProxy = newInsecureServer(p.grpcListener, p.grpcProxyListener)
}
s.closer = p.closer
return s.serverWithProxy.start(p)
}
// Stop the gRPC+HTTP(s) REST server.
func (s *Server) Stop() {
s.serverWithProxy.stop()
if s.closer != nil {
s.closer()
}
}
// startServingIndefinitely creates a server based on the params and begins serving the gRPC and HTTP proxy.

View File

@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"
"open-match.dev/open-match/internal/rpc"
"open-match.dev/open-match/internal/util"
netlistenerTesting "open-match.dev/open-match/internal/util/netlistener/testing"
certgenTesting "open-match.dev/open-match/tools/certgen/testing"
)
@ -56,6 +57,7 @@ func MustServeInsecure(t *testing.T, binder func(*rpc.ServerParams)) *TestContex
s: s,
grpcAddress: grpcAddress,
proxyAddress: proxyAddress,
mc: util.NewMultiClose(),
}
}
@ -81,6 +83,7 @@ func MustServeTLS(t *testing.T, binder func(*rpc.ServerParams)) *TestContext {
grpcAddress: grpcAddress,
proxyAddress: proxyAddress,
trustedCertificate: pub,
mc: util.NewMultiClose(),
}
}
@ -102,20 +105,17 @@ type TestContext struct {
grpcAddress string
proxyAddress string
trustedCertificate []byte
closers []func()
mc *util.MultiClose
}
// AddCloseFunc adds a close function.
func (tc *TestContext) AddCloseFunc(closer func()) {
tc.closers = append(tc.closers, closer)
tc.mc.AddCloseFunc(closer)
}
// Close shutsdown the server and frees the TCP port.
func (tc *TestContext) Close() {
for _, closer := range tc.closers {
closer()
}
tc.mc.Close()
tc.s.Stop()
}

View File

@ -33,6 +33,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"open-match.dev/open-match/internal/rpc"
"open-match.dev/open-match/internal/util"
pb "open-match.dev/open-match/pkg/pb"
)
@ -40,7 +41,7 @@ type clusterOM struct {
kubeClient kubernetes.Interface
namespace string
t *testing.T
mc *multicloser
mc *util.MultiClose
}
func (com *clusterOM) withT(t *testing.T) OM {
@ -48,7 +49,7 @@ func (com *clusterOM) withT(t *testing.T) OM {
kubeClient: com.kubeClient,
namespace: com.namespace,
t: t,
mc: newMulticloser(),
mc: util.NewMultiClose(),
}
}
@ -57,7 +58,7 @@ func (com *clusterOM) MustFrontendGRPC() pb.FrontendClient {
if err != nil {
com.t.Fatalf("cannot create gRPC client, %s", err)
}
com.mc.addSilent(conn.Close)
com.mc.AddCloseWithErrorFunc(conn.Close)
return pb.NewFrontendClient(conn)
}
@ -66,7 +67,7 @@ func (com *clusterOM) MustBackendGRPC() pb.BackendClient {
if err != nil {
com.t.Fatalf("cannot create gRPC client, %s", err)
}
com.mc.addSilent(conn.Close)
com.mc.AddCloseWithErrorFunc(conn.Close)
return pb.NewBackendClient(conn)
}
@ -75,7 +76,7 @@ func (com *clusterOM) MustMmLogicGRPC() pb.MmLogicClient {
if err != nil {
com.t.Fatalf("cannot create gRPC client, %s", err)
}
com.mc.addSilent(conn.Close)
com.mc.AddCloseWithErrorFunc(conn.Close)
return pb.NewMmLogicClient(conn)
}
@ -155,7 +156,7 @@ func (com *clusterOM) Context() context.Context {
}
func (com *clusterOM) cleanup() {
com.mc.close()
com.mc.Close()
}
func (com *clusterOM) cleanupMain() error {

View File

@ -18,7 +18,6 @@ import (
"context"
"log"
"os"
"sync"
"testing"
pb "open-match.dev/open-match/pkg/pb"
@ -69,33 +68,3 @@ func RunMain(m *testing.M) {
zygote = z
exitCode = m.Run()
}
type multicloser struct {
closers []func()
m sync.Mutex
}
func newMulticloser() *multicloser {
return &multicloser{
closers: []func(){},
}
}
func (mc *multicloser) addSilent(f func() error) {
mc.m.Lock()
defer mc.m.Unlock()
mc.closers = append(mc.closers, func() {
if err := f(); err != nil {
log.Printf("failed to close, %s", err)
}
})
}
func (mc *multicloser) close() {
mc.m.Lock()
defer mc.m.Unlock()
for _, c := range mc.closers {
c()
}
mc.closers = []func(){}
}

View File

@ -27,6 +27,7 @@ import (
"open-match.dev/open-match/internal/rpc"
rpcTesting "open-match.dev/open-match/internal/rpc/testing"
statestoreTesting "open-match.dev/open-match/internal/statestore/testing"
"open-match.dev/open-match/internal/util"
evalHarness "open-match.dev/open-match/pkg/harness/evaluator/golang"
mmfHarness "open-match.dev/open-match/pkg/harness/function/golang"
pb "open-match.dev/open-match/pkg/pb"
@ -37,7 +38,7 @@ type inmemoryOM struct {
mmfTc *rpcTesting.TestContext
evalTc *rpcTesting.TestContext
t *testing.T
mc *multicloser
mc *util.MultiClose
}
func (iom *inmemoryOM) withT(t *testing.T) OM {
@ -50,7 +51,7 @@ func (iom *inmemoryOM) withT(t *testing.T) OM {
mmfTc: mmfTc,
evalTc: evalTc,
t: t,
mc: newMulticloser(),
mc: util.NewMultiClose(),
}
return om
}
@ -61,19 +62,19 @@ func createZygote(m *testing.M) (OM, error) {
func (iom *inmemoryOM) MustFrontendGRPC() pb.FrontendClient {
conn := iom.mainTc.MustGRPC()
iom.mc.addSilent(conn.Close)
iom.mc.AddCloseWithErrorFunc(conn.Close)
return pb.NewFrontendClient(conn)
}
func (iom *inmemoryOM) MustBackendGRPC() pb.BackendClient {
conn := iom.mainTc.MustGRPC()
iom.mc.addSilent(conn.Close)
iom.mc.AddCloseWithErrorFunc(conn.Close)
return pb.NewBackendClient(conn)
}
func (iom *inmemoryOM) MustMmLogicGRPC() pb.MmLogicClient {
conn := iom.mainTc.MustGRPC()
iom.mc.addSilent(conn.Close)
iom.mc.AddCloseWithErrorFunc(conn.Close)
return pb.NewMmLogicClient(conn)
}
@ -102,7 +103,7 @@ func (iom *inmemoryOM) Context() context.Context {
}
func (iom *inmemoryOM) cleanup() {
iom.mc.close()
iom.mc.Close()
iom.mainTc.Close()
iom.mmfTc.Close()
iom.evalTc.Close()

View File

@ -0,0 +1,71 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package util provides utilities for net.Listener.
package util
import (
"sync"
"github.com/sirupsen/logrus"
)
var (
logger = logrus.WithFields(logrus.Fields{
"app": "openmatch",
"component": "util",
})
)
// MultiClose is a helper for closing multiple close functions at the end of a program.
type MultiClose struct {
closers []func()
m sync.Mutex
}
// NewMultiClose creates a new multi-closer.
func NewMultiClose() *MultiClose {
return &MultiClose{
closers: []func(){},
}
}
// AddCloseFunc adds a close function.
func (mc *MultiClose) AddCloseFunc(closer func()) {
mc.m.Lock()
defer mc.m.Unlock()
mc.closers = append(mc.closers, closer)
}
// AddCloseWithErrorFunc adds a close function.
func (mc *MultiClose) AddCloseWithErrorFunc(closer func() error) {
mc.m.Lock()
defer mc.m.Unlock()
mc.closers = append(mc.closers, func() {
err := closer()
if err != nil {
logger.WithError(err).Warning("close function failed")
}
})
}
// Close shutsdown the server and frees the TCP port.
func (mc *MultiClose) Close() {
mc.m.Lock()
defer mc.m.Unlock()
for _, closer := range mc.closers {
closer()
}
mc.closers = []func(){}
}

View File

@ -0,0 +1,61 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMultiCloseNothing(t *testing.T) {
mc := NewMultiClose()
mc.Close()
}
func TestMultiCloseSingle(t *testing.T) {
assert := assert.New(t)
a := 5
closeA := func() {
a++
}
mc := NewMultiClose()
mc.AddCloseFunc(closeA)
mc.Close()
assert.Equal(6, a)
}
func TestMultiCloseMulti(t *testing.T) {
assert := assert.New(t)
a := 5
closeA := func() {
a++
}
closeAWithError := func() error {
a -= 2
return nil
}
b := 5
closeB := func() {
b += 10
}
mc := NewMultiClose()
mc.AddCloseFunc(closeA)
mc.AddCloseWithErrorFunc(closeAWithError)
mc.AddCloseFunc(closeB)
mc.Close()
assert.Equal(4, a)
assert.Equal(15, b)
}