test: add test on client redirect

This matches PR https://github.com/siderolabs/discovery-client/pull/4.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
Andrey Smirnov
2022-09-15 17:10:48 +04:00
parent 8db8ef361e
commit 912943a343
7 changed files with 300 additions and 34 deletions

View File

@ -17,7 +17,7 @@ The project has been split into 3 parts (as they have different source code lice
- This repository
- [Discovery Client](https://github.com/siderolabs/discovery-client), contains the client code to interact with the server
- [Discovery Service API](https://github.com/talos-systems/discovery-api/), provides gRPC API definition and cluster data protobuf data structures
- [Discovery Service API](https://github.com/siderolabs/discovery-api/), provides gRPC API definition and cluster data protobuf data structures
## Setup
@ -27,7 +27,7 @@ All of the details to get started are present [in the Makefile](Makefile), start
Once the application is running you can test the grpc functionality on the port 3000 and the http pages in browser on port 3001.
To test the grpc calls, [install grpcurl](https://github.com/fullstorydev/grpcurl#installation) and clone the [Discovery Service API](https://github.com/talos-systems/discovery-api/) repository.
To test the grpc calls, [install grpcurl](https://github.com/fullstorydev/grpcurl#installation) and clone the [Discovery Service API](https://github.com/siderolabs/discovery-api/) repository.
- Sample code-block to test the grpc `Hello` call (change path accordingly)

View File

@ -26,7 +26,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/talos-systems/discovery-api/api/v1alpha1/server/pb"
"github.com/siderolabs/discovery-api/api/v1alpha1/server/pb"
"github.com/talos-systems/go-debug"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

8
go.mod
View File

@ -6,13 +6,13 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.13.0
github.com/siderolabs/discovery-client v0.1.1
github.com/siderolabs/discovery-api v0.1.1
github.com/siderolabs/discovery-client v0.1.2
github.com/stretchr/testify v1.8.0
github.com/talos-systems/discovery-api v0.1.0
github.com/talos-systems/go-debug v0.2.1
go.uber.org/zap v1.23.0
go4.org/netipx v0.0.0-20220812043211-3cc044ffd68d
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
golang.org/x/sync v0.0.0-20220907140024-f12130a52804
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
@ -21,7 +21,7 @@ require (
require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect

16
go.sum
View File

@ -45,8 +45,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@ -210,8 +210,10 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/siderolabs/discovery-client v0.1.1 h1:1lhXTepW4V1IJPf91ApvF77cBl086d1OXuC4WdVp8hM=
github.com/siderolabs/discovery-client v0.1.1/go.mod h1:gBE5qxdB4BcY2nXOBJ14Qh8u7cBadCofjbxBLohcnbQ=
github.com/siderolabs/discovery-api v0.1.1 h1:DI+CjD/Nl0nIk8qkaNKz1sEquWTEKHKH4+ERsv+yBWg=
github.com/siderolabs/discovery-api v0.1.1/go.mod h1:JnJg4h1HbAhOazQl0lYHEjrg63rg/cf9r2te6/DqUxo=
github.com/siderolabs/discovery-client v0.1.2 h1:QsNwt/boSPM+f8Ww+GlAMv2+9o5KPhLdPUfjvVuNIm8=
github.com/siderolabs/discovery-client v0.1.2/go.mod h1:4ahEk2dMPKAGCLK5sRUxHfVryROxflwDPL+2c5MrLMI=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@ -226,8 +228,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/talos-systems/discovery-api v0.1.0 h1:aKod6uqakH6VfeQ6HaxPF7obqFAL1QTJe4HHTb2mVKk=
github.com/talos-systems/discovery-api v0.1.0/go.mod h1:ZsbzzOC5bzToaF3+YvUXDf9paeWV5bedpDu5RPXrglM=
github.com/talos-systems/go-debug v0.2.1 h1:VSN8P1zXWeHWgUBZn4cVT3keBcecCAJBG9Up+F6N2KM=
github.com/talos-systems/go-debug v0.2.1/go.mod h1:pR4NjsZQNFqGx3n4qkD4MIj1F2CxyIF8DCiO1+05JO0=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -341,8 +341,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220907140024-f12130a52804 h1:0SH2R3f1b1VmIMG7BXbEZCBUu2dKmHschSmjqGUrW8A=
golang.org/x/sync v0.0.0-20220907140024-f12130a52804/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -19,10 +19,10 @@ import (
"testing"
"time"
clientpb "github.com/siderolabs/discovery-api/api/v1alpha1/client/pb"
"github.com/siderolabs/discovery-client/pkg/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientpb "github.com/talos-systems/discovery-api/api/v1alpha1/client/pb"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
@ -32,7 +32,7 @@ import (
func TestClient(t *testing.T) {
t.Parallel()
endpoint := setupServer(t, 5000, "")
endpoint := setupServer(t, 5000, "").address
logger := zaptest.NewLogger(t)
@ -526,3 +526,226 @@ func clusterSimulator(t *testing.T, endpoint string, logger *zap.Logger, numAffi
assert.NoError(t, err)
}
}
//nolint:gocognit,gocyclo,cyclop,maintidx
func TestClientRedirect(t *testing.T) {
t.Parallel()
srv1 := setupServer(t, 5000, "")
srv2 := setupServer(t, 5000, "")
endpoint := srv1.address
logger := zaptest.NewLogger(t)
clusterID := "cluster_redirect"
key := make([]byte, 32)
_, err := io.ReadFull(rand.Reader, key)
require.NoError(t, err)
cipher, err := aes.NewCipher(key)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
affiliate1 := "affiliate_one"
affiliate2 := "affiliate_two"
client1, err := client.NewClient(client.Options{
Cipher: cipher,
Endpoint: endpoint,
ClusterID: clusterID,
AffiliateID: affiliate1,
TTL: time.Minute,
Insecure: true,
})
require.NoError(t, err)
client2, err := client.NewClient(client.Options{
Cipher: cipher,
Endpoint: endpoint,
ClusterID: clusterID,
AffiliateID: affiliate2,
TTL: time.Minute,
Insecure: true,
})
require.NoError(t, err)
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return client1.Run(ctx, logger, notify1)
})
eg.Go(func() error {
return client2.Run(ctx, logger, notify2)
})
select {
case <-notify1:
case <-time.After(2 * time.Second):
require.Fail(t, "no initial snapshot update")
}
assert.Empty(t, client1.GetAffiliates())
select {
case <-notify2:
case <-time.After(2 * time.Second):
require.Fail(t, "no initial snapshot update")
}
assert.Empty(t, client2.GetAffiliates())
affiliate1PB := &client.Affiliate{
Affiliate: &clientpb.Affiliate{
NodeId: affiliate1,
Addresses: [][]byte{{1, 2, 3}},
Hostname: "host1",
Nodename: "node1",
MachineType: "controlplane",
},
}
require.NoError(t, client1.SetLocalData(affiliate1PB, nil))
affiliate2PB := &client.Affiliate{
Affiliate: &clientpb.Affiliate{
NodeId: affiliate2,
Addresses: [][]byte{{2, 3, 4}},
Hostname: "host2",
Nodename: "node2",
MachineType: "worker",
},
}
require.NoError(t, client2.SetLocalData(affiliate2PB, nil))
// both clients should eventually discover each other
for {
t.Logf("client1 affiliates = %d", len(client1.GetAffiliates()))
if len(client1.GetAffiliates()) == 1 {
break
}
select {
case <-notify1:
case <-time.After(2 * time.Second):
t.Logf("client1 affiliates on timeout = %d", len(client1.GetAffiliates()))
require.Fail(t, "no incremental update")
}
}
require.Len(t, client1.GetAffiliates(), 1)
assert.Equal(t, []*client.Affiliate{affiliate2PB}, client1.GetAffiliates())
for {
t.Logf("client2 affiliates = %d", len(client1.GetAffiliates()))
if len(client2.GetAffiliates()) == 1 {
break
}
select {
case <-notify2:
case <-time.After(2 * time.Second):
require.Fail(t, "no incremental update")
}
}
require.Len(t, client2.GetAffiliates(), 1)
assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates())
// drain notify channels
drainLoop:
for {
select {
case <-notify1:
case <-notify2:
case <-time.After(time.Second):
break drainLoop
}
}
// make srv1 redirect all clients to srv2
srv1.restartWithRedirect(t, srv2.address)
// both clients should get updates about each other after a reconnect
client1Loop:
for {
select {
case <-notify1:
t.Logf("reconnect: client1 affiliates = %d", len(client1.GetAffiliates()))
if len(client1.GetAffiliates()) == 1 {
break client1Loop
}
case <-time.After(2 * time.Second):
require.Fail(t, "no incremental update")
}
}
require.Len(t, client1.GetAffiliates(), 1)
assert.Equal(t, []*client.Affiliate{affiliate2PB}, client1.GetAffiliates())
client2Loop:
for {
select {
case <-notify2:
t.Logf("reconnect: client2 affiliates = %d", len(client2.GetAffiliates()))
if len(client2.GetAffiliates()) == 1 {
break client2Loop
}
case <-time.After(2 * time.Second):
require.Fail(t, "no incremental update")
}
}
require.Len(t, client2.GetAffiliates(), 1)
assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates())
// stop old srv1, graceful stop should work as all clients should have disconnected
srv1.s.GracefulStop()
// update affiliate1, client2 should see the update
affiliate1PB.Endpoints = []*clientpb.Endpoint{
{
Ip: []byte{1, 2, 3, 4},
Port: 5678,
},
}
require.NoError(t, client1.SetLocalData(affiliate1PB, nil))
for {
select {
case <-notify2:
case <-time.After(time.Second):
require.Fail(t, "no incremental update")
}
if len(client2.GetAffiliates()[0].Endpoints) == 1 {
break
}
}
assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates())
cancel()
err = eg.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
assert.NoError(t, err)
}
}

View File

@ -12,7 +12,7 @@ import (
"time"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/talos-systems/discovery-api/api/v1alpha1/server/pb"
"github.com/siderolabs/discovery-api/api/v1alpha1/server/pb"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

View File

@ -8,6 +8,7 @@ package server_test
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"strings"
@ -18,9 +19,9 @@ import (
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
prom "github.com/prometheus/client_golang/prometheus"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/siderolabs/discovery-api/api/v1alpha1/server/pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/talos-systems/discovery-api/api/v1alpha1/server/pb"
"go.uber.org/zap/zaptest"
"golang.org/x/time/rate"
"google.golang.org/grpc"
@ -45,33 +46,51 @@ func checkMetrics(t *testing.T, c prom.Collector) {
assert.NotZero(t, promtestutil.CollectAndCount(c), "collector should not be unchecked")
}
func setupServer(t *testing.T, rateLimit rate.Limit, redirectEndpoint string) (address string) {
type testServer struct { //nolint:govet
lis net.Listener
s *grpc.Server
state *state.State
stopCh <-chan struct{}
serverOptions []grpc.ServerOption
address string
}
func setupServer(t *testing.T, rateLimit rate.Limit, redirectEndpoint string) *testServer {
t.Helper()
logger := zaptest.NewLogger(t)
state := state.NewState(logger)
testServer := &testServer{}
testServer.state = state.NewState(logger)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
testServer.stopCh = ctx.Done()
go func() {
state.RunGC(ctx, logger, time.Second)
testServer.state.RunGC(ctx, logger, time.Second)
}()
srv := server.NewClusterServer(state, ctx.Done(), redirectEndpoint)
srv := server.NewClusterServer(testServer.state, testServer.stopCh, redirectEndpoint)
// Check metrics before and after the test
// to ensure that collector does not switch from being unchecked to checked and invalid.
checkMetrics(t, srv)
t.Cleanup(func() { checkMetrics(t, srv) })
lis, err := net.Listen("tcp", "localhost:0")
var err error
testServer.lis, err = net.Listen("tcp", "localhost:0")
require.NoError(t, err)
testServer.address = testServer.lis.Addr().String()
limiter := limits.NewIPRateLimiter(rateLimit, limits.BurstSizeMax)
serverOptions := []grpc.ServerOption{
testServer.serverOptions = []grpc.ServerOption{
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(server.FieldExtractor)),
server.AddPeerAddressUnaryServerInterceptor(),
@ -84,22 +103,46 @@ func setupServer(t *testing.T, rateLimit rate.Limit, redirectEndpoint string) (a
),
}
s := grpc.NewServer(serverOptions...)
pb.RegisterClusterServer(s, srv)
testServer.s = grpc.NewServer(testServer.serverOptions...)
pb.RegisterClusterServer(testServer.s, srv)
go func() {
require.NoError(t, s.Serve(lis))
if stopErr := testServer.s.Serve(testServer.lis); stopErr != nil && !errors.Is(stopErr, grpc.ErrServerStopped) {
require.NoError(t, err)
}
}()
t.Cleanup(s.Stop)
t.Cleanup(testServer.s.Stop)
return lis.Addr().String()
return testServer
}
func (testServer *testServer) restartWithRedirect(t *testing.T, redirectEndpoint string) {
testServer.s.Stop()
srv := server.NewClusterServer(testServer.state, testServer.stopCh, redirectEndpoint)
testServer.s = grpc.NewServer(testServer.serverOptions...)
pb.RegisterClusterServer(testServer.s, srv)
var err error
testServer.lis, err = net.Listen("tcp", testServer.address)
require.NoError(t, err)
go func() {
if stopErr := testServer.s.Serve(testServer.lis); stopErr != nil && !errors.Is(stopErr, grpc.ErrServerStopped) {
require.NoError(t, err)
}
}()
t.Cleanup(testServer.s.Stop)
}
func TestServerAPI(t *testing.T) {
t.Parallel()
addr := setupServer(t, 5000, "")
addr := setupServer(t, 5000, "").address
conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, e)
@ -299,7 +342,7 @@ func TestServerAPI(t *testing.T) {
func TestValidation(t *testing.T) {
t.Parallel()
addr := setupServer(t, 5000, "")
addr := setupServer(t, 5000, "").address
conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, e)
@ -494,7 +537,7 @@ func testHitRateLimit(client pb.ClusterClient, ip string) func(t *testing.T) {
func TestServerRateLimit(t *testing.T) {
t.Parallel()
addr := setupServer(t, 1, "")
addr := setupServer(t, 1, "").address
conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, e)
@ -508,7 +551,7 @@ func TestServerRateLimit(t *testing.T) {
func TestServerRedirect(t *testing.T) {
t.Parallel()
addr := setupServer(t, 1, "new.example.com:443")
addr := setupServer(t, 1, "new.example.com:443").address
conn, e := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, e)