diff --git a/README.md b/README.md index 6e87a7a..8b7b587 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The project has been split into 3 parts (as they have different source code lice - This repository - [Discovery Client](https://github.com/siderolabs/discovery-client), contains the client code to interact with the server -- [Discovery Service API](https://github.com/talos-systems/discovery-api/), provides gRPC API definition and cluster data protobuf data structures +- [Discovery Service API](https://github.com/siderolabs/discovery-api/), provides gRPC API definition and cluster data protobuf data structures ## Setup @@ -27,7 +27,7 @@ All of the details to get started are present [in the Makefile](Makefile), start Once the application is running you can test the grpc functionality on the port 3000 and the http pages in browser on port 3001. -To test the grpc calls, [install grpcurl](https://github.com/fullstorydev/grpcurl#installation) and clone the [Discovery Service API](https://github.com/talos-systems/discovery-api/) repository. +To test the grpc calls, [install grpcurl](https://github.com/fullstorydev/grpcurl#installation) and clone the [Discovery Service API](https://github.com/siderolabs/discovery-api/) repository. - Sample code-block to test the grpc `Hello` call (change path accordingly) diff --git a/cmd/discovery-service/main.go b/cmd/discovery-service/main.go index 316e6b8..bafeb76 100644 --- a/cmd/discovery-service/main.go +++ b/cmd/discovery-service/main.go @@ -26,7 +26,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/talos-systems/discovery-api/api/v1alpha1/server/pb" + "github.com/siderolabs/discovery-api/api/v1alpha1/server/pb" "github.com/talos-systems/go-debug" "go.uber.org/zap" "golang.org/x/sync/errgroup" diff --git a/go.mod b/go.mod index ad24633..2c3e210 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,13 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/prometheus/client_golang v1.13.0 - github.com/siderolabs/discovery-client v0.1.1 + github.com/siderolabs/discovery-api v0.1.1 + github.com/siderolabs/discovery-client v0.1.2 github.com/stretchr/testify v1.8.0 - github.com/talos-systems/discovery-api v0.1.0 github.com/talos-systems/go-debug v0.2.1 go.uber.org/zap v1.23.0 go4.org/netipx v0.0.0-20220812043211-3cc044ffd68d - golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde + golang.org/x/sync v0.0.0-20220907140024-f12130a52804 golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.1 @@ -21,7 +21,7 @@ require ( require ( github.com/benbjohnson/clock v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.1.2 // indirect + github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.2 // indirect diff --git a/go.sum b/go.sum index 2303ca8..5f705cd 100644 --- a/go.sum +++ b/go.sum @@ -45,8 +45,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= -github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -210,8 +210,10 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/siderolabs/discovery-client v0.1.1 h1:1lhXTepW4V1IJPf91ApvF77cBl086d1OXuC4WdVp8hM= -github.com/siderolabs/discovery-client v0.1.1/go.mod h1:gBE5qxdB4BcY2nXOBJ14Qh8u7cBadCofjbxBLohcnbQ= +github.com/siderolabs/discovery-api v0.1.1 h1:DI+CjD/Nl0nIk8qkaNKz1sEquWTEKHKH4+ERsv+yBWg= +github.com/siderolabs/discovery-api v0.1.1/go.mod h1:JnJg4h1HbAhOazQl0lYHEjrg63rg/cf9r2te6/DqUxo= +github.com/siderolabs/discovery-client v0.1.2 h1:QsNwt/boSPM+f8Ww+GlAMv2+9o5KPhLdPUfjvVuNIm8= +github.com/siderolabs/discovery-client v0.1.2/go.mod h1:4ahEk2dMPKAGCLK5sRUxHfVryROxflwDPL+2c5MrLMI= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -226,8 +228,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/talos-systems/discovery-api v0.1.0 h1:aKod6uqakH6VfeQ6HaxPF7obqFAL1QTJe4HHTb2mVKk= -github.com/talos-systems/discovery-api v0.1.0/go.mod h1:ZsbzzOC5bzToaF3+YvUXDf9paeWV5bedpDu5RPXrglM= github.com/talos-systems/go-debug v0.2.1 h1:VSN8P1zXWeHWgUBZn4cVT3keBcecCAJBG9Up+F6N2KM= github.com/talos-systems/go-debug v0.2.1/go.mod h1:pR4NjsZQNFqGx3n4qkD4MIj1F2CxyIF8DCiO1+05JO0= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -341,8 +341,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= -golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220907140024-f12130a52804 h1:0SH2R3f1b1VmIMG7BXbEZCBUu2dKmHschSmjqGUrW8A= +golang.org/x/sync v0.0.0-20220907140024-f12130a52804/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/server/client_test.go b/pkg/server/client_test.go index d573683..63133dd 100644 --- a/pkg/server/client_test.go +++ b/pkg/server/client_test.go @@ -19,10 +19,10 @@ import ( "testing" "time" + clientpb "github.com/siderolabs/discovery-api/api/v1alpha1/client/pb" "github.com/siderolabs/discovery-client/pkg/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - clientpb "github.com/talos-systems/discovery-api/api/v1alpha1/client/pb" "go.uber.org/zap" "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" @@ -32,7 +32,7 @@ import ( func TestClient(t *testing.T) { t.Parallel() - endpoint := setupServer(t, 5000, "") + endpoint := setupServer(t, 5000, "").address logger := zaptest.NewLogger(t) @@ -526,3 +526,226 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi assert.NoError(t, err) } } + +//nolint:gocognit,gocyclo,cyclop,maintidx +func TestClientRedirect(t *testing.T) { + t.Parallel() + + srv1 := setupServer(t, 5000, "") + srv2 := setupServer(t, 5000, "") + + endpoint := srv1.address + + logger := zaptest.NewLogger(t) + + clusterID := "cluster_redirect" + + key := make([]byte, 32) + _, err := io.ReadFull(rand.Reader, key) + require.NoError(t, err) + + cipher, err := aes.NewCipher(key) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + affiliate1 := "affiliate_one" + affiliate2 := "affiliate_two" + + client1, err := client.NewClient(client.Options{ + Cipher: cipher, + Endpoint: endpoint, + ClusterID: clusterID, + AffiliateID: affiliate1, + TTL: time.Minute, + Insecure: true, + }) + require.NoError(t, err) + + client2, err := client.NewClient(client.Options{ + Cipher: cipher, + Endpoint: endpoint, + ClusterID: clusterID, + AffiliateID: affiliate2, + TTL: time.Minute, + Insecure: true, + }) + require.NoError(t, err) + + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { + return client1.Run(ctx, logger, notify1) + }) + + eg.Go(func() error { + return client2.Run(ctx, logger, notify2) + }) + + select { + case <-notify1: + case <-time.After(2 * time.Second): + require.Fail(t, "no initial snapshot update") + } + + assert.Empty(t, client1.GetAffiliates()) + + select { + case <-notify2: + case <-time.After(2 * time.Second): + require.Fail(t, "no initial snapshot update") + } + + assert.Empty(t, client2.GetAffiliates()) + + affiliate1PB := &client.Affiliate{ + Affiliate: &clientpb.Affiliate{ + NodeId: affiliate1, + Addresses: [][]byte{{1, 2, 3}}, + Hostname: "host1", + Nodename: "node1", + MachineType: "controlplane", + }, + } + + require.NoError(t, client1.SetLocalData(affiliate1PB, nil)) + + affiliate2PB := &client.Affiliate{ + Affiliate: &clientpb.Affiliate{ + NodeId: affiliate2, + Addresses: [][]byte{{2, 3, 4}}, + Hostname: "host2", + Nodename: "node2", + MachineType: "worker", + }, + } + + require.NoError(t, client2.SetLocalData(affiliate2PB, nil)) + + // both clients should eventually discover each other + for { + t.Logf("client1 affiliates = %d", len(client1.GetAffiliates())) + + if len(client1.GetAffiliates()) == 1 { + break + } + + select { + case <-notify1: + case <-time.After(2 * time.Second): + t.Logf("client1 affiliates on timeout = %d", len(client1.GetAffiliates())) + + require.Fail(t, "no incremental update") + } + } + + require.Len(t, client1.GetAffiliates(), 1) + + assert.Equal(t, []*client.Affiliate{affiliate2PB}, client1.GetAffiliates()) + + for { + t.Logf("client2 affiliates = %d", len(client1.GetAffiliates())) + + if len(client2.GetAffiliates()) == 1 { + break + } + + select { + case <-notify2: + case <-time.After(2 * time.Second): + require.Fail(t, "no incremental update") + } + } + + require.Len(t, client2.GetAffiliates(), 1) + + assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates()) + + // drain notify channels +drainLoop: + for { + select { + case <-notify1: + case <-notify2: + case <-time.After(time.Second): + break drainLoop + } + } + + // make srv1 redirect all clients to srv2 + srv1.restartWithRedirect(t, srv2.address) + + // both clients should get updates about each other after a reconnect +client1Loop: + for { + select { + case <-notify1: + t.Logf("reconnect: client1 affiliates = %d", len(client1.GetAffiliates())) + + if len(client1.GetAffiliates()) == 1 { + break client1Loop + } + case <-time.After(2 * time.Second): + require.Fail(t, "no incremental update") + } + } + + require.Len(t, client1.GetAffiliates(), 1) + + assert.Equal(t, []*client.Affiliate{affiliate2PB}, client1.GetAffiliates()) + +client2Loop: + for { + select { + case <-notify2: + t.Logf("reconnect: client2 affiliates = %d", len(client2.GetAffiliates())) + + if len(client2.GetAffiliates()) == 1 { + break client2Loop + } + case <-time.After(2 * time.Second): + require.Fail(t, "no incremental update") + } + } + + require.Len(t, client2.GetAffiliates(), 1) + + assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates()) + + // stop old srv1, graceful stop should work as all clients should have disconnected + srv1.s.GracefulStop() + + // update affiliate1, client2 should see the update + affiliate1PB.Endpoints = []*clientpb.Endpoint{ + { + Ip: []byte{1, 2, 3, 4}, + Port: 5678, + }, + } + require.NoError(t, client1.SetLocalData(affiliate1PB, nil)) + + for { + select { + case <-notify2: + case <-time.After(time.Second): + require.Fail(t, "no incremental update") + } + + if len(client2.GetAffiliates()[0].Endpoints) == 1 { + break + } + } + + assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates()) + + cancel() + + err = eg.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + assert.NoError(t, err) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 2b16f17..b94fecc 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -12,7 +12,7 @@ import ( "time" prom "github.com/prometheus/client_golang/prometheus" - "github.com/talos-systems/discovery-api/api/v1alpha1/server/pb" + "github.com/siderolabs/discovery-api/api/v1alpha1/server/pb" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 84af8a2..7d882d0 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -8,6 +8,7 @@ package server_test import ( "bytes" "context" + "errors" "fmt" "net" "strings" @@ -18,9 +19,9 @@ import ( grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" prom "github.com/prometheus/client_golang/prometheus" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/siderolabs/discovery-api/api/v1alpha1/server/pb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/talos-systems/discovery-api/api/v1alpha1/server/pb" "go.uber.org/zap/zaptest" "golang.org/x/time/rate" "google.golang.org/grpc" @@ -45,33 +46,51 @@ func checkMetrics(t *testing.T, c prom.Collector) { assert.NotZero(t, promtestutil.CollectAndCount(c), "collector should not be unchecked") } -func setupServer(t *testing.T, rateLimit rate.Limit, redirectEndpoint string) (address string) { +type testServer struct { //nolint:govet + lis net.Listener + s *grpc.Server + state *state.State + stopCh <-chan struct{} + serverOptions []grpc.ServerOption + + address string +} + +func setupServer(t *testing.T, rateLimit rate.Limit, redirectEndpoint string) *testServer { t.Helper() logger := zaptest.NewLogger(t) - state := state.NewState(logger) + testServer := &testServer{} + + testServer.state = state.NewState(logger) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) + testServer.stopCh = ctx.Done() + go func() { - state.RunGC(ctx, logger, time.Second) + testServer.state.RunGC(ctx, logger, time.Second) }() - srv := server.NewClusterServer(state, ctx.Done(), redirectEndpoint) + srv := server.NewClusterServer(testServer.state, testServer.stopCh, redirectEndpoint) // Check metrics before and after the test // to ensure that collector does not switch from being unchecked to checked and invalid. checkMetrics(t, srv) t.Cleanup(func() { checkMetrics(t, srv) }) - lis, err := net.Listen("tcp", "localhost:0") + var err error + + testServer.lis, err = net.Listen("tcp", "localhost:0") require.NoError(t, err) + testServer.address = testServer.lis.Addr().String() + limiter := limits.NewIPRateLimiter(rateLimit, limits.BurstSizeMax) - serverOptions := []grpc.ServerOption{ + testServer.serverOptions = []grpc.ServerOption{ grpc_middleware.WithUnaryServerChain( grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(server.FieldExtractor)), server.AddPeerAddressUnaryServerInterceptor(), @@ -84,22 +103,46 @@ func setupServer(t *testing.T, rateLimit rate.Limit, redirectEndpoint string) (a ), } - s := grpc.NewServer(serverOptions...) - pb.RegisterClusterServer(s, srv) + testServer.s = grpc.NewServer(testServer.serverOptions...) + pb.RegisterClusterServer(testServer.s, srv) go func() { - require.NoError(t, s.Serve(lis)) + if stopErr := testServer.s.Serve(testServer.lis); stopErr != nil && !errors.Is(stopErr, grpc.ErrServerStopped) { + require.NoError(t, err) + } }() - t.Cleanup(s.Stop) + t.Cleanup(testServer.s.Stop) - return lis.Addr().String() + return testServer +} + +func (testServer *testServer) restartWithRedirect(t *testing.T, redirectEndpoint string) { + testServer.s.Stop() + + srv := server.NewClusterServer(testServer.state, testServer.stopCh, redirectEndpoint) + + testServer.s = grpc.NewServer(testServer.serverOptions...) + pb.RegisterClusterServer(testServer.s, srv) + + var err error + + testServer.lis, err = net.Listen("tcp", testServer.address) + require.NoError(t, err) + + go func() { + if stopErr := testServer.s.Serve(testServer.lis); stopErr != nil && !errors.Is(stopErr, grpc.ErrServerStopped) { + require.NoError(t, err) + } + }() + + t.Cleanup(testServer.s.Stop) } func TestServerAPI(t *testing.T) { t.Parallel() - addr := setupServer(t, 5000, "") + addr := setupServer(t, 5000, "").address conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, e) @@ -299,7 +342,7 @@ func TestServerAPI(t *testing.T) { func TestValidation(t *testing.T) { t.Parallel() - addr := setupServer(t, 5000, "") + addr := setupServer(t, 5000, "").address conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, e) @@ -494,7 +537,7 @@ func testHitRateLimit(client pb.ClusterClient, ip string) func(t *testing.T) { func TestServerRateLimit(t *testing.T) { t.Parallel() - addr := setupServer(t, 1, "") + addr := setupServer(t, 1, "").address conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, e) @@ -508,7 +551,7 @@ func TestServerRateLimit(t *testing.T) { func TestServerRedirect(t *testing.T) { t.Parallel() - addr := setupServer(t, 1, "new.example.com:443") + addr := setupServer(t, 1, "new.example.com:443").address conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, e)