From 619546696afe99c1b58b46ca819bb32e74560f5b Mon Sep 17 00:00:00 2001
From: Andrey Smirnov <andrey.smirnov@talos-systems.com>
Date: Thu, 23 Sep 2021 17:47:12 +0300
Subject: [PATCH] feat: enable vtprotobuf, watch batching, more limits

Fixes #5

Batch watch responses in a single batch so that client can quickly know
that initial snapshot got delivered.

Bump go.mod deps.

Implement more limits.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
---
 .codecov.yml                          |   4 +-
 .kres.yaml                            |   2 +-
 api/v1alpha1/cluster.proto            |   5 +-
 api/v1alpha1/pb/cluster.pb.go         | 120 +++++++++++++-------------
 api/v1alpha1/pb/cluster_grpc.pb.go    |  11 +--
 api/v1alpha1/pb/cluster_vtproto.pb.go |  54 +++++-------
 cmd/discovery-service/main.go         |   1 +
 go.mod                                |  18 ++--
 go.sum                                |  58 +++++++------
 internal/proto/proto.go               |  83 ++++++++++++++++++
 internal/state/affiliate.go           |  10 ++-
 internal/state/affiliate_test.go      |  30 +++++--
 internal/state/cluster.go             |  16 +++-
 internal/state/cluster_test.go        |  57 +++++++++---
 internal/state/errors.go              |  14 +++
 internal/state/state_test.go          |   6 +-
 internal/state/subscribe.go           |   4 +-
 pkg/limits/limits.go                  |  19 ++++
 pkg/server/server.go                  |  63 ++++++++++----
 pkg/server/server_test.go             |  98 +++++++++++++++++----
 pkg/server/validate.go                |  37 ++++++--
 21 files changed, 504 insertions(+), 206 deletions(-)
 create mode 100644 internal/proto/proto.go
 create mode 100644 internal/state/errors.go
 create mode 100644 pkg/limits/limits.go

diff --git a/.codecov.yml b/.codecov.yml
index 6122870..de550cd 100644
--- a/.codecov.yml
+++ b/.codecov.yml
@@ -1,6 +1,6 @@
 # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
 #
-# Generated on 2021-09-22T21:32:59Z by kres 2a27963-dirty.
+# Generated on 2021-09-23T14:58:27Z by kres 2a27963-dirty.
 
 codecov:
   require_ci_to_pass: false
@@ -9,7 +9,7 @@ coverage:
   status:
     project:
       default:
-        target: 15%
+        target: 30%
         threshold: 0.5%
         base: auto
         if_ci_failed: success
diff --git a/.kres.yaml b/.kres.yaml
index 1a2f087..aa6088d 100644
--- a/.kres.yaml
+++ b/.kres.yaml
@@ -11,4 +11,4 @@ spec:
 ---
 kind: service.CodeCov
 spec:
-  targetThreshold: 15
+  targetThreshold: 30
diff --git a/api/v1alpha1/cluster.proto b/api/v1alpha1/cluster.proto
index b74e197..71ec92e 100644
--- a/api/v1alpha1/cluster.proto
+++ b/api/v1alpha1/cluster.proto
@@ -18,6 +18,9 @@ service Cluster {
     // List affiliates in the cluster.
     rpc List(ListRequest) returns (ListResponse);
     // Watch affiliate updates in the cluster.
+    //
+    // The first WatchResponse contains the snapshot of the current affiliate state (even if empty).
+    // Subsequent response messages are sent as the affiliates get changed.
     rpc Watch(WatchRequest) returns (stream WatchResponse);
 }
 
@@ -108,7 +111,7 @@ message WatchRequest {
 
 message WatchResponse {
     // List of cluster affiliates.
-    Affiliate affiliate = 1;
+    repeated Affiliate affiliates = 1;
     // Flag that affiliate was deleted, only ID field is valid.
     bool deleted = 2;
 }
diff --git a/api/v1alpha1/pb/cluster.pb.go b/api/v1alpha1/pb/cluster.pb.go
index 4698676..5d38172 100644
--- a/api/v1alpha1/pb/cluster.pb.go
+++ b/api/v1alpha1/pb/cluster.pb.go
@@ -7,12 +7,11 @@
 package pb
 
 import (
-	reflect "reflect"
-	sync "sync"
-
 	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
 	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
 	durationpb "google.golang.org/protobuf/types/known/durationpb"
+	reflect "reflect"
+	sync "sync"
 )
 
 const (
@@ -621,7 +620,7 @@ type WatchResponse struct {
 	unknownFields protoimpl.UnknownFields
 
 	// List of cluster affiliates.
-	Affiliate *Affiliate `protobuf:"bytes,1,opt,name=affiliate,proto3" json:"affiliate,omitempty"`
+	Affiliates []*Affiliate `protobuf:"bytes,1,rep,name=affiliates,proto3" json:"affiliates,omitempty"`
 	// Flag that affiliate was deleted, only ID field is valid.
 	Deleted bool `protobuf:"varint,2,opt,name=deleted,proto3" json:"deleted,omitempty"`
 }
@@ -658,9 +657,9 @@ func (*WatchResponse) Descriptor() ([]byte, []int) {
 	return file_v1alpha1_pb_cluster_proto_rawDescGZIP(), []int{11}
 }
 
-func (x *WatchResponse) GetAffiliate() *Affiliate {
+func (x *WatchResponse) GetAffiliates() []*Affiliate {
 	if x != nil {
-		return x.Affiliate
+		return x.Affiliates
 	}
 	return nil
 }
@@ -736,44 +735,44 @@ var file_v1alpha1_pb_cluster_proto_rawDesc = []byte{
 	0x52, 0x0a, 0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x22, 0x2d, 0x0a, 0x0c,
 	0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a,
 	0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
-	0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x22, 0x64, 0x0a, 0x0d, 0x57,
-	0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x09,
-	0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x1b, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65,
-	0x72, 0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x52, 0x09, 0x61, 0x66,
-	0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74,
-	0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65,
-	0x64, 0x32, 0xb6, 0x03, 0x0a, 0x07, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x48, 0x0a,
-	0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x1e, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e,
-	0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52,
-	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e,
-	0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52,
-	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x0f, 0x41, 0x66, 0x66, 0x69, 0x6c,
-	0x69, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x2e, 0x73, 0x69, 0x64,
-	0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x41, 0x66,
-	0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71,
-	0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69,
-	0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74,
-	0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
-	0x66, 0x0a, 0x0f, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x44, 0x65, 0x6c, 0x65,
-	0x74, 0x65, 0x12, 0x28, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63,
-	0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x44,
-	0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x73,
+	0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x22, 0x66, 0x0a, 0x0d, 0x57,
+	0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x0a,
+	0x61, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b,
+	0x32, 0x1b, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76,
+	0x65, 0x72, 0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x61,
+	0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c,
+	0x65, 0x74, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x64, 0x65, 0x6c, 0x65,
+	0x74, 0x65, 0x64, 0x32, 0xb6, 0x03, 0x0a, 0x07, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12,
+	0x48, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x1e, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72,
+	0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x48, 0x65, 0x6c, 0x6c,
+	0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72,
+	0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x48, 0x65, 0x6c, 0x6c,
+	0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x0f, 0x41, 0x66, 0x66,
+	0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x2e, 0x73,
 	0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e,
-	0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52,
-	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12,
-	0x1d, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65,
-	0x72, 0x79, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e,
+	0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52,
+	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e,
+	0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69,
+	0x61, 0x74, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+	0x65, 0x12, 0x66, 0x0a, 0x0f, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x44, 0x65,
+	0x6c, 0x65, 0x74, 0x65, 0x12, 0x28, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69,
+	0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74,
+	0x65, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29,
 	0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72,
-	0x79, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4a,
-	0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x1e, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f,
-	0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68,
-	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f,
-	0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x57, 0x61, 0x74, 0x63, 0x68,
-	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69,
-	0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2d, 0x73,
-	0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70,
-	0x68, 0x61, 0x31, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x79, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6c, 0x69, 0x61, 0x74, 0x65, 0x44, 0x65, 0x6c, 0x65, 0x74,
+	0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x04, 0x4c, 0x69, 0x73,
+	0x74, 0x12, 0x1d, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f,
+	0x76, 0x65, 0x72, 0x79, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+	0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x64, 0x65, 0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76,
+	0x65, 0x72, 0x79, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+	0x12, 0x4a, 0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x1e, 0x2e, 0x73, 0x69, 0x64, 0x65,
+	0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x57, 0x61, 0x74,
+	0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x64, 0x65,
+	0x72, 0x6f, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x57, 0x61, 0x74,
+	0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28,
+	0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73,
+	0x2d, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x61,
+	0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -788,30 +787,27 @@ func file_v1alpha1_pb_cluster_proto_rawDescGZIP() []byte {
 	return file_v1alpha1_pb_cluster_proto_rawDescData
 }
 
-var (
-	file_v1alpha1_pb_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
-	file_v1alpha1_pb_cluster_proto_goTypes  = []interface{}{
-		(*Affiliate)(nil),               // 0: sidero.discovery.Affiliate
-		(*HelloRequest)(nil),            // 1: sidero.discovery.HelloRequest
-		(*RedirectMessage)(nil),         // 2: sidero.discovery.RedirectMessage
-		(*HelloResponse)(nil),           // 3: sidero.discovery.HelloResponse
-		(*AffiliateUpdateRequest)(nil),  // 4: sidero.discovery.AffiliateUpdateRequest
-		(*AffiliateUpdateResponse)(nil), // 5: sidero.discovery.AffiliateUpdateResponse
-		(*AffiliateDeleteRequest)(nil),  // 6: sidero.discovery.AffiliateDeleteRequest
-		(*AffiliateDeleteResponse)(nil), // 7: sidero.discovery.AffiliateDeleteResponse
-		(*ListRequest)(nil),             // 8: sidero.discovery.ListRequest
-		(*ListResponse)(nil),            // 9: sidero.discovery.ListResponse
-		(*WatchRequest)(nil),            // 10: sidero.discovery.WatchRequest
-		(*WatchResponse)(nil),           // 11: sidero.discovery.WatchResponse
-		(*durationpb.Duration)(nil),     // 12: google.protobuf.Duration
-	}
-)
-
+var file_v1alpha1_pb_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
+var file_v1alpha1_pb_cluster_proto_goTypes = []interface{}{
+	(*Affiliate)(nil),               // 0: sidero.discovery.Affiliate
+	(*HelloRequest)(nil),            // 1: sidero.discovery.HelloRequest
+	(*RedirectMessage)(nil),         // 2: sidero.discovery.RedirectMessage
+	(*HelloResponse)(nil),           // 3: sidero.discovery.HelloResponse
+	(*AffiliateUpdateRequest)(nil),  // 4: sidero.discovery.AffiliateUpdateRequest
+	(*AffiliateUpdateResponse)(nil), // 5: sidero.discovery.AffiliateUpdateResponse
+	(*AffiliateDeleteRequest)(nil),  // 6: sidero.discovery.AffiliateDeleteRequest
+	(*AffiliateDeleteResponse)(nil), // 7: sidero.discovery.AffiliateDeleteResponse
+	(*ListRequest)(nil),             // 8: sidero.discovery.ListRequest
+	(*ListResponse)(nil),            // 9: sidero.discovery.ListResponse
+	(*WatchRequest)(nil),            // 10: sidero.discovery.WatchRequest
+	(*WatchResponse)(nil),           // 11: sidero.discovery.WatchResponse
+	(*durationpb.Duration)(nil),     // 12: google.protobuf.Duration
+}
 var file_v1alpha1_pb_cluster_proto_depIdxs = []int32{
 	2,  // 0: sidero.discovery.HelloResponse.redirect:type_name -> sidero.discovery.RedirectMessage
 	12, // 1: sidero.discovery.AffiliateUpdateRequest.ttl:type_name -> google.protobuf.Duration
 	0,  // 2: sidero.discovery.ListResponse.affiliates:type_name -> sidero.discovery.Affiliate
-	0,  // 3: sidero.discovery.WatchResponse.affiliate:type_name -> sidero.discovery.Affiliate
+	0,  // 3: sidero.discovery.WatchResponse.affiliates:type_name -> sidero.discovery.Affiliate
 	1,  // 4: sidero.discovery.Cluster.Hello:input_type -> sidero.discovery.HelloRequest
 	4,  // 5: sidero.discovery.Cluster.AffiliateUpdate:input_type -> sidero.discovery.AffiliateUpdateRequest
 	6,  // 6: sidero.discovery.Cluster.AffiliateDelete:input_type -> sidero.discovery.AffiliateDeleteRequest
diff --git a/api/v1alpha1/pb/cluster_grpc.pb.go b/api/v1alpha1/pb/cluster_grpc.pb.go
index 6d0a46c..d0a3095 100644
--- a/api/v1alpha1/pb/cluster_grpc.pb.go
+++ b/api/v1alpha1/pb/cluster_grpc.pb.go
@@ -4,7 +4,6 @@ package pb
 
 import (
 	context "context"
-
 	grpc "google.golang.org/grpc"
 	codes "google.golang.org/grpc/codes"
 	status "google.golang.org/grpc/status"
@@ -30,6 +29,9 @@ type ClusterClient interface {
 	// List affiliates in the cluster.
 	List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
 	// Watch affiliate updates in the cluster.
+	//
+	// The first WatchResponse contains the snapshot of the current affiliate state (even if empty).
+	// Subsequent response messages are sent as the affiliates get changed.
 	Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Cluster_WatchClient, error)
 }
 
@@ -124,6 +126,9 @@ type ClusterServer interface {
 	// List affiliates in the cluster.
 	List(context.Context, *ListRequest) (*ListResponse, error)
 	// Watch affiliate updates in the cluster.
+	//
+	// The first WatchResponse contains the snapshot of the current affiliate state (even if empty).
+	// Subsequent response messages are sent as the affiliates get changed.
 	Watch(*WatchRequest, Cluster_WatchServer) error
 	mustEmbedUnimplementedClusterServer()
 }
@@ -135,19 +140,15 @@ type UnimplementedClusterServer struct {
 func (UnimplementedClusterServer) Hello(context.Context, *HelloRequest) (*HelloResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
 }
-
 func (UnimplementedClusterServer) AffiliateUpdate(context.Context, *AffiliateUpdateRequest) (*AffiliateUpdateResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method AffiliateUpdate not implemented")
 }
-
 func (UnimplementedClusterServer) AffiliateDelete(context.Context, *AffiliateDeleteRequest) (*AffiliateDeleteResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method AffiliateDelete not implemented")
 }
-
 func (UnimplementedClusterServer) List(context.Context, *ListRequest) (*ListResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
 }
-
 func (UnimplementedClusterServer) Watch(*WatchRequest, Cluster_WatchServer) error {
 	return status.Errorf(codes.Unimplemented, "method Watch not implemented")
 }
diff --git a/api/v1alpha1/pb/cluster_vtproto.pb.go b/api/v1alpha1/pb/cluster_vtproto.pb.go
index 01447dd..7719e8d 100644
--- a/api/v1alpha1/pb/cluster_vtproto.pb.go
+++ b/api/v1alpha1/pb/cluster_vtproto.pb.go
@@ -6,12 +6,11 @@ package pb
 
 import (
 	fmt "fmt"
-	io "io"
-	bits "math/bits"
-
 	proto "google.golang.org/protobuf/proto"
 	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
 	durationpb "google.golang.org/protobuf/types/known/durationpb"
+	io "io"
+	bits "math/bits"
 )
 
 const (
@@ -579,15 +578,17 @@ func (m *WatchResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
 		i--
 		dAtA[i] = 0x10
 	}
-	if m.Affiliate != nil {
-		size, err := m.Affiliate.MarshalToSizedBufferVT(dAtA[:i])
-		if err != nil {
-			return 0, err
+	if len(m.Affiliates) > 0 {
+		for iNdEx := len(m.Affiliates) - 1; iNdEx >= 0; iNdEx-- {
+			size, err := m.Affiliates[iNdEx].MarshalToSizedBufferVT(dAtA[:i])
+			if err != nil {
+				return 0, err
+			}
+			i -= size
+			i = encodeVarint(dAtA, i, uint64(size))
+			i--
+			dAtA[i] = 0xa
 		}
-		i -= size
-		i = encodeVarint(dAtA, i, uint64(size))
-		i--
-		dAtA[i] = 0xa
 	}
 	return len(dAtA) - i, nil
 }
@@ -603,7 +604,6 @@ func encodeVarint(dAtA []byte, offset int, v uint64) int {
 	dAtA[offset] = uint8(v)
 	return base
 }
-
 func (m *Affiliate) SizeVT() (n int) {
 	if m == nil {
 		return 0
@@ -828,9 +828,11 @@ func (m *WatchResponse) SizeVT() (n int) {
 	}
 	var l int
 	_ = l
-	if m.Affiliate != nil {
-		l = m.Affiliate.SizeVT()
-		n += 1 + l + sov(uint64(l))
+	if len(m.Affiliates) > 0 {
+		for _, e := range m.Affiliates {
+			l = e.SizeVT()
+			n += 1 + l + sov(uint64(l))
+		}
 	}
 	if m.Deleted {
 		n += 2
@@ -844,11 +846,9 @@ func (m *WatchResponse) SizeVT() (n int) {
 func sov(x uint64) (n int) {
 	return (bits.Len64(x|1) + 6) / 7
 }
-
 func soz(x uint64) (n int) {
 	return sov(uint64((x << 1) ^ uint64((int64(x) >> 63))))
 }
-
 func (m *Affiliate) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -998,7 +998,6 @@ func (m *Affiliate) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *HelloRequest) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1114,7 +1113,6 @@ func (m *HelloRequest) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *RedirectMessage) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1198,7 +1196,6 @@ func (m *RedirectMessage) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *HelloResponse) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1320,7 +1317,6 @@ func (m *HelloResponse) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *AffiliateUpdateRequest) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1546,7 +1542,6 @@ func (m *AffiliateUpdateRequest) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *AffiliateUpdateResponse) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1598,7 +1593,6 @@ func (m *AffiliateUpdateResponse) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *AffiliateDeleteRequest) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1714,7 +1708,6 @@ func (m *AffiliateDeleteRequest) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *AffiliateDeleteResponse) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1766,7 +1759,6 @@ func (m *AffiliateDeleteResponse) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *ListRequest) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1850,7 +1842,6 @@ func (m *ListRequest) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *ListResponse) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -1936,7 +1927,6 @@ func (m *ListResponse) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *WatchRequest) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -2020,7 +2010,6 @@ func (m *WatchRequest) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func (m *WatchResponse) UnmarshalVT(dAtA []byte) error {
 	l := len(dAtA)
 	iNdEx := 0
@@ -2052,7 +2041,7 @@ func (m *WatchResponse) UnmarshalVT(dAtA []byte) error {
 		switch fieldNum {
 		case 1:
 			if wireType != 2 {
-				return fmt.Errorf("proto: wrong wireType = %d for field Affiliate", wireType)
+				return fmt.Errorf("proto: wrong wireType = %d for field Affiliates", wireType)
 			}
 			var msglen int
 			for shift := uint(0); ; shift += 7 {
@@ -2079,10 +2068,8 @@ func (m *WatchResponse) UnmarshalVT(dAtA []byte) error {
 			if postIndex > l {
 				return io.ErrUnexpectedEOF
 			}
-			if m.Affiliate == nil {
-				m.Affiliate = &Affiliate{}
-			}
-			if err := m.Affiliate.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
+			m.Affiliates = append(m.Affiliates, &Affiliate{})
+			if err := m.Affiliates[len(m.Affiliates)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil {
 				return err
 			}
 			iNdEx = postIndex
@@ -2128,7 +2115,6 @@ func (m *WatchResponse) UnmarshalVT(dAtA []byte) error {
 	}
 	return nil
 }
-
 func skip(dAtA []byte) (n int, err error) {
 	l := len(dAtA)
 	iNdEx := 0
diff --git a/cmd/discovery-service/main.go b/cmd/discovery-service/main.go
index b27ff00..82bcf6f 100644
--- a/cmd/discovery-service/main.go
+++ b/cmd/discovery-service/main.go
@@ -30,6 +30,7 @@ import (
 	"google.golang.org/grpc/status"
 
 	"github.com/talos-systems/discovery-service/api/v1alpha1/pb"
+	_ "github.com/talos-systems/discovery-service/internal/proto"
 	"github.com/talos-systems/discovery-service/internal/state"
 	"github.com/talos-systems/discovery-service/pkg/server"
 )
diff --git a/go.mod b/go.mod
index fb12eb5..6e16d9e 100644
--- a/go.mod
+++ b/go.mod
@@ -6,11 +6,11 @@ require (
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
 	github.com/prometheus/client_golang v1.11.0
-	github.com/stretchr/testify v1.5.1
-	go.uber.org/zap v1.16.0
-	golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
+	github.com/stretchr/testify v1.7.0
+	go.uber.org/zap v1.19.1
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
 	google.golang.org/grpc v1.40.0
-	google.golang.org/protobuf v1.26.0-rc.1
+	google.golang.org/protobuf v1.27.1
 	inet.af/netaddr v0.0.0-20210903134321-85fa6c94624e
 )
 
@@ -18,19 +18,19 @@ require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
-	github.com/golang/protobuf v1.4.3 // indirect
+	github.com/golang/protobuf v1.5.0 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/prometheus/client_model v0.2.0 // indirect
 	github.com/prometheus/common v0.26.0 // indirect
 	github.com/prometheus/procfs v0.6.0 // indirect
-	go.uber.org/atomic v1.6.0 // indirect
-	go.uber.org/multierr v1.5.0 // indirect
+	go.uber.org/atomic v1.7.0 // indirect
+	go.uber.org/multierr v1.6.0 // indirect
 	go4.org/intern v0.0.0-20210108033219-3eb7198706b2 // indirect
 	go4.org/unsafe/assume-no-moving-gc v0.0.0-20201222180813-1025295fd063 // indirect
-	golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
+	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
 	golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect
 	golang.org/x/text v0.3.6 // indirect
 	google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
-	gopkg.in/yaml.v2 v2.3.0 // indirect
+	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
 )
diff --git a/go.sum b/go.sum
index 0325572..ff40a20 100644
--- a/go.sum
+++ b/go.sum
@@ -1,6 +1,5 @@
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -9,6 +8,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
 github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -56,8 +57,9 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
 github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -67,7 +69,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
-github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
@@ -125,7 +126,6 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
 github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
 github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
-github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -135,22 +135,24 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
-github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
-go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
-go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
+go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
-go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
-go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
-go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
-go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
 go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
-go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
-go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
+go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
+go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
 go4.org/intern v0.0.0-20210108033219-3eb7198706b2 h1:VFTf+jjIgsldaz/Mr00VaCSswHJrI2hIjQygE/W4IMg=
 go4.org/intern v0.0.0-20210108033219-3eb7198706b2/go.mod h1:vLqJ+12kCw61iCWsPto0EOHhBS+o4rO5VIucbc9g2Cc=
 go4.org/unsafe/assume-no-moving-gc v0.0.0-20201222175341-b30ae309168e/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
@@ -158,19 +160,16 @@ go4.org/unsafe/assume-no-moving-gc v0.0.0-20201222180813-1025295fd063 h1:1tk03FU
 go4.org/unsafe/assume-no-moving-gc v0.0.0-20201222180813-1025295fd063/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
 golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
 golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
-golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
 golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
-golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -183,8 +182,9 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
-golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -194,8 +194,9 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -207,10 +208,14 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -221,14 +226,11 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
 golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
-golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
-golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
 golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -260,24 +262,26 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
-google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
-honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
 inet.af/netaddr v0.0.0-20210903134321-85fa6c94624e h1:tvgqez5ZQoBBiBAGNU/fmJy247yB/7++kcLOEoMYup0=
 inet.af/netaddr v0.0.0-20210903134321-85fa6c94624e/go.mod h1:z0nx+Dh+7N7CC8V5ayHtHGpZpxLQZZxkIaaz6HN65Ls=
diff --git a/internal/proto/proto.go b/internal/proto/proto.go
new file mode 100644
index 0000000..5b02140
--- /dev/null
+++ b/internal/proto/proto.go
@@ -0,0 +1,83 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+// Package proto contains wrappers to inject vtprotobuf marshaling.
+package proto
+
+import (
+	"fmt"
+
+	"google.golang.org/grpc/encoding"
+	"google.golang.org/grpc/encoding/proto"
+	protov2 "google.golang.org/protobuf/proto" //nolint:gci
+)
+
+// Codec provides protobuf encoding.Codec.
+type Codec struct{}
+
+// Marshal implements encoding.Codec.
+func (Codec) Marshal(v interface{}) ([]byte, error) {
+	// our types implement Message (with or without vtproto additions depending on build configuration)
+	if m, ok := v.(Message); ok {
+		return Marshal(m)
+	}
+
+	// no types implement protobuf API v1 only, so don't check for it
+
+	return nil, fmt.Errorf("failed to marshal %T", v)
+}
+
+// Unmarshal implements encoding.Codec.
+func (Codec) Unmarshal(data []byte, v interface{}) error {
+	// our types implement Message (with or without vtproto additions depending on build configuration)
+	if m, ok := v.(Message); ok {
+		return Unmarshal(data, m)
+	}
+
+	// no types implement protobuf API v1 only, so don't check for it
+
+	return fmt.Errorf("failed to unmarshal %T", v)
+}
+
+// Name implements encoding.Codec.
+func (Codec) Name() string {
+	return proto.Name // overrides google.golang.org/grpc/encoding/proto codec
+}
+
+// Message is the main interface for protobuf API v2 messages.
+type Message = protov2.Message
+
+// vtprotoMessage is the interface for vtproto additions.
+//
+// We use only a subset of that interface but include additional methods
+// to prevent accidental successful type assertion for unrelated types.
+type vtprotoMessage interface {
+	MarshalVT() ([]byte, error)
+	MarshalToVT([]byte) (int, error)
+	MarshalToSizedBufferVT([]byte) (int, error)
+	UnmarshalVT([]byte) error
+}
+
+// Marshal returns the wire-format encoding of m.
+func Marshal(m Message) ([]byte, error) {
+	if vm, ok := m.(vtprotoMessage); ok {
+		return vm.MarshalVT()
+	}
+
+	return protov2.Marshal(m)
+}
+
+// Unmarshal parses the wire-format message in b and places the result in m.
+// The provided message must be mutable (e.g., a non-nil pointer to a message).
+func Unmarshal(b []byte, m Message) error {
+	if vm, ok := m.(vtprotoMessage); ok {
+		return vm.UnmarshalVT(b)
+	}
+
+	return protov2.Unmarshal(b, m)
+}
+
+func init() {
+	encoding.RegisterCodec(Codec{})
+}
diff --git a/internal/state/affiliate.go b/internal/state/affiliate.go
index bbb89fb..900090a 100644
--- a/internal/state/affiliate.go
+++ b/internal/state/affiliate.go
@@ -7,6 +7,8 @@ package state
 import (
 	"bytes"
 	"time"
+
+	"github.com/talos-systems/discovery-service/pkg/limits"
 )
 
 // Affiliate represents cluster affiliate state.
@@ -50,7 +52,7 @@ func (affiliate *Affiliate) Update(data []byte, expiration time.Time) {
 }
 
 // MergeEndpoints and potentially update expiration for endpoints.
-func (affiliate *Affiliate) MergeEndpoints(endpoints [][]byte, expiration time.Time) {
+func (affiliate *Affiliate) MergeEndpoints(endpoints [][]byte, expiration time.Time) error {
 	for _, endpoint := range endpoints {
 		found := false
 
@@ -67,6 +69,10 @@ func (affiliate *Affiliate) MergeEndpoints(endpoints [][]byte, expiration time.T
 		}
 
 		if !found {
+			if len(affiliate.endpoints) >= limits.AffiliateEndpointsMax {
+				return ErrTooManyEndpoints
+			}
+
 			affiliate.endpoints = append(affiliate.endpoints, Endpoint{
 				expiration: expiration,
 				data:       endpoint,
@@ -79,6 +85,8 @@ func (affiliate *Affiliate) MergeEndpoints(endpoints [][]byte, expiration time.T
 	if affiliate.expiration.Before(expiration) {
 		affiliate.expiration = expiration
 	}
+
+	return nil
 }
 
 // GarbageCollect affiliate data.
diff --git a/internal/state/affiliate_test.go b/internal/state/affiliate_test.go
index 540500c..60c4ea1 100644
--- a/internal/state/affiliate_test.go
+++ b/internal/state/affiliate_test.go
@@ -5,12 +5,15 @@
 package state_test
 
 import (
+	"fmt"
 	"testing"
 	"time"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 
 	"github.com/talos-systems/discovery-service/internal/state"
+	"github.com/talos-systems/discovery-service/pkg/limits"
 )
 
 func TestAffiliateMutations(t *testing.T) {
@@ -45,10 +48,10 @@ func TestAffiliateMutations(t *testing.T) {
 
 	affiliate.ClearChanged()
 
-	affiliate.MergeEndpoints([][]byte{
+	assert.NoError(t, affiliate.MergeEndpoints([][]byte{
 		[]byte("e1"),
 		[]byte("e2"),
-	}, now.Add(time.Minute))
+	}, now.Add(time.Minute)))
 
 	assert.Equal(t, &state.AffiliateExport{
 		ID:        "id1",
@@ -59,16 +62,16 @@ func TestAffiliateMutations(t *testing.T) {
 	assert.True(t, affiliate.IsChanged())
 	affiliate.ClearChanged()
 
-	affiliate.MergeEndpoints([][]byte{
+	assert.NoError(t, affiliate.MergeEndpoints([][]byte{
 		[]byte("e1"),
-	}, now.Add(time.Minute))
+	}, now.Add(time.Minute)))
 
 	assert.False(t, affiliate.IsChanged())
 
-	affiliate.MergeEndpoints([][]byte{
+	assert.NoError(t, affiliate.MergeEndpoints([][]byte{
 		[]byte("e1"),
 		[]byte("e3"),
-	}, now.Add(3*time.Minute))
+	}, now.Add(3*time.Minute)))
 
 	assert.Equal(t, &state.AffiliateExport{
 		ID:        "id1",
@@ -96,3 +99,18 @@ func TestAffiliateMutations(t *testing.T) {
 	assert.True(t, remove)
 	assert.True(t, changed)
 }
+
+func TestAffiliateTooManyEndpoints(t *testing.T) {
+	now := time.Now()
+
+	affiliate := state.NewAffiliate("id1")
+
+	for i := 0; i < limits.AffiliateEndpointsMax; i++ {
+		assert.NoError(t, affiliate.MergeEndpoints([][]byte{[]byte(fmt.Sprintf("endpoint%d", i))}, now))
+	}
+
+	err := affiliate.MergeEndpoints([][]byte{[]byte("endpoint")}, now)
+	require.Error(t, err)
+
+	assert.ErrorIs(t, err, state.ErrTooManyEndpoints)
+}
diff --git a/internal/state/cluster.go b/internal/state/cluster.go
index 5fb73f8..5cee5ac 100644
--- a/internal/state/cluster.go
+++ b/internal/state/cluster.go
@@ -7,6 +7,8 @@ package state
 import (
 	"sync"
 	"time"
+
+	"github.com/talos-systems/discovery-service/pkg/limits"
 )
 
 // Cluster is a collection of affiliates.
@@ -32,14 +34,14 @@ func NewCluster(id string) *Cluster {
 // WithAffiliate runs a function against the affiliate.
 //
 // Cluster state is locked while the function is running.
-func (cluster *Cluster) WithAffiliate(id string, f func(affiliate *Affiliate)) {
+func (cluster *Cluster) WithAffiliate(id string, f func(affiliate *Affiliate) error) error {
 	cluster.affiliatesMu.Lock()
 	defer cluster.affiliatesMu.Unlock()
 
 	if affiliate, ok := cluster.affiliates[id]; ok {
 		affiliate.ClearChanged()
 
-		f(affiliate)
+		err := f(affiliate)
 
 		if affiliate.IsChanged() {
 			cluster.notify(&Notification{
@@ -48,17 +50,23 @@ func (cluster *Cluster) WithAffiliate(id string, f func(affiliate *Affiliate)) {
 			})
 		}
 
-		return
+		return err
+	}
+
+	if len(cluster.affiliates) >= limits.ClusterAffiliatesMax {
+		return ErrTooManyAffiliates
 	}
 
 	affiliate := NewAffiliate(id)
-	f(affiliate)
+	err := f(affiliate)
 
 	cluster.affiliates[id] = affiliate
 	cluster.notify(&Notification{
 		AffiliateID: id,
 		Affiliate:   affiliate.Export(),
 	})
+
+	return err
 }
 
 // DeleteAffiliate deletes affiliate from the cluster.
diff --git a/internal/state/cluster_test.go b/internal/state/cluster_test.go
index 3c8871d..e926891 100644
--- a/internal/state/cluster_test.go
+++ b/internal/state/cluster_test.go
@@ -5,13 +5,16 @@
 package state_test
 
 import (
+	"fmt"
 	"sort"
 	"testing"
 	"time"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 
 	"github.com/talos-systems/discovery-service/internal/state"
+	"github.com/talos-systems/discovery-service/pkg/limits"
 )
 
 func TestClusterMutations(t *testing.T) {
@@ -24,9 +27,11 @@ func TestClusterMutations(t *testing.T) {
 
 	assert.Len(t, cluster.List(), 0)
 
-	cluster.WithAffiliate("af1", func(affiliate *state.Affiliate) {
+	assert.NoError(t, cluster.WithAffiliate("af1", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data"), now.Add(time.Minute))
-	})
+
+		return nil
+	}))
 
 	assert.Len(t, cluster.List(), 1)
 
@@ -37,9 +42,11 @@ func TestClusterMutations(t *testing.T) {
 
 	assert.Len(t, snapshot, 1)
 
-	cluster.WithAffiliate("af1", func(affiliate *state.Affiliate) {
+	assert.NoError(t, cluster.WithAffiliate("af1", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data1"), now.Add(time.Minute))
-	})
+
+		return nil
+	}))
 
 	assert.Len(t, cluster.List(), 1)
 	assert.Equal(t, []*state.AffiliateExport{
@@ -62,9 +69,11 @@ func TestClusterMutations(t *testing.T) {
 		assert.Fail(t, "no notification")
 	}
 
-	cluster.WithAffiliate("af2", func(affiliate *state.Affiliate) {
+	assert.NoError(t, cluster.WithAffiliate("af2", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data2"), now.Add(time.Minute))
-	})
+
+		return nil
+	}))
 
 	assert.Len(t, cluster.List(), 2)
 
@@ -166,17 +175,23 @@ func TestClusterSubscriptions(t *testing.T) {
 		assert.Empty(t, snapshot)
 	}
 
-	cluster.WithAffiliate("af1", func(affiliate *state.Affiliate) {
+	assert.NoError(t, cluster.WithAffiliate("af1", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data1"), now.Add(time.Minute))
-	})
 
-	cluster.WithAffiliate("af2", func(affiliate *state.Affiliate) {
+		return nil
+	}))
+
+	assert.NoError(t, cluster.WithAffiliate("af2", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data2"), now.Add(time.Minute))
-	})
 
-	cluster.WithAffiliate("af2", func(affiliate *state.Affiliate) {
+		return nil
+	}))
+
+	assert.NoError(t, cluster.WithAffiliate("af2", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data2_1"), now.Add(time.Minute))
-	})
+
+		return nil
+	}))
 
 	cluster.DeleteAffiliate("af2")
 
@@ -241,3 +256,21 @@ func TestClusterSubscriptions(t *testing.T) {
 		})
 	}
 }
+
+func TestClusterTooManyAffiliates(t *testing.T) {
+	t.Parallel()
+
+	cluster := state.NewCluster("cluster3")
+
+	for i := 0; i < limits.ClusterAffiliatesMax; i++ {
+		assert.NoError(t, cluster.WithAffiliate(fmt.Sprintf("af%d", i), func(affiliate *state.Affiliate) error {
+			return nil
+		}))
+	}
+
+	err := cluster.WithAffiliate("af", func(affiliate *state.Affiliate) error {
+		return nil
+	})
+	require.Error(t, err)
+	require.ErrorIs(t, err, state.ErrTooManyAffiliates)
+}
diff --git a/internal/state/errors.go b/internal/state/errors.go
new file mode 100644
index 0000000..2ebf0be
--- /dev/null
+++ b/internal/state/errors.go
@@ -0,0 +1,14 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package state
+
+import "fmt"
+
+var (
+	// ErrTooManyAffiliates is raised when there are too many affiliates in the cluster.
+	ErrTooManyAffiliates = fmt.Errorf("too many affiliates in the cluster")
+	// ErrTooManyEndpoints is raised when there are too many endpoints in the affiliate.
+	ErrTooManyEndpoints = fmt.Errorf("too many endpoints in the affiliate")
+)
diff --git a/internal/state/state_test.go b/internal/state/state_test.go
index 24f09ae..4fd093e 100644
--- a/internal/state/state_test.go
+++ b/internal/state/state_test.go
@@ -22,9 +22,11 @@ func TestState(t *testing.T) {
 	assert.Equal(t, 0, deletedClusters)
 
 	st.GetCluster("id1")
-	st.GetCluster("id2").WithAffiliate("af1", func(affiliate *state.Affiliate) {
+	assert.NoError(t, st.GetCluster("id2").WithAffiliate("af1", func(affiliate *state.Affiliate) error {
 		affiliate.Update([]byte("data1"), now.Add(time.Minute))
-	})
+
+		return nil
+	}))
 
 	deletedClusters = st.GarbageCollect(now)
 	assert.Equal(t, 1, deletedClusters)
diff --git a/internal/state/subscribe.go b/internal/state/subscribe.go
index 0dad61d..b162ed4 100644
--- a/internal/state/subscribe.go
+++ b/internal/state/subscribe.go
@@ -4,7 +4,9 @@
 
 package state
 
-import "fmt"
+import (
+	"fmt"
+)
 
 // Notification about affiliate update.
 //
diff --git a/pkg/limits/limits.go b/pkg/limits/limits.go
new file mode 100644
index 0000000..8c47686
--- /dev/null
+++ b/pkg/limits/limits.go
@@ -0,0 +1,19 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+// Package limits provides various service limits.
+package limits
+
+import "time"
+
+// Service limits.
+const (
+	ClusterIDMax          = 256
+	AffiliateIDMax        = 256
+	AffiliateDataMax      = 2048
+	AffiliateEndpointMax  = 32
+	TTLMax                = 30 * time.Minute
+	ClusterAffiliatesMax  = 1024
+	AffiliateEndpointsMax = 64
+)
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 95f9799..2ca05b3 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -7,6 +7,7 @@ package server
 
 import (
 	"context"
+	"errors"
 	"net"
 	"time"
 
@@ -71,15 +72,36 @@ func (srv *ClusterServer) AffiliateUpdate(ctx context.Context, req *pb.Affiliate
 		return nil, err
 	}
 
-	srv.state.GetCluster(req.ClusterId).WithAffiliate(req.AffiliateId, func(affiliate *state.Affiliate) {
+	if err := validateAffiliateData(req.AffiliateData); err != nil {
+		return nil, err
+	}
+
+	if err := validateAffiliateEndpoints(req.AffiliateEndpoints); err != nil {
+		return nil, err
+	}
+
+	if err := validateTTL(req.Ttl.AsDuration()); err != nil {
+		return nil, err
+	}
+
+	if err := srv.state.GetCluster(req.ClusterId).WithAffiliate(req.AffiliateId, func(affiliate *state.Affiliate) error {
 		expiration := time.Now().Add(req.Ttl.AsDuration())
 
 		if len(req.AffiliateData) > 0 {
 			affiliate.Update(req.AffiliateData, expiration)
 		}
 
-		affiliate.MergeEndpoints(req.AffiliateEndpoints, expiration)
-	})
+		return affiliate.MergeEndpoints(req.AffiliateEndpoints, expiration)
+	}); err != nil {
+		switch {
+		case errors.Is(err, state.ErrTooManyEndpoints):
+			return nil, status.Error(codes.ResourceExhausted, err.Error())
+		case errors.Is(err, state.ErrTooManyAffiliates):
+			return nil, status.Error(codes.ResourceExhausted, err.Error())
+		default:
+			return nil, err
+		}
+	}
 
 	return &pb.AffiliateUpdateResponse{}, nil
 }
@@ -133,20 +155,23 @@ func (srv *ClusterServer) Watch(req *pb.WatchRequest, server pb.Cluster_WatchSer
 	snapshot, subscription := srv.state.GetCluster(req.ClusterId).Subscribe(updates)
 	defer subscription.Close()
 
+	snapshotResp := &pb.WatchResponse{}
+
 	for _, affiliate := range snapshot {
-		if err := server.Send(&pb.WatchResponse{
-			Affiliate: &pb.Affiliate{
+		snapshotResp.Affiliates = append(snapshotResp.Affiliates,
+			&pb.Affiliate{
 				Id:        affiliate.ID,
 				Data:      affiliate.Data,
 				Endpoints: affiliate.Endpoints,
-			},
-		}); err != nil {
-			if status.Code(err) == codes.Canceled {
-				return nil
-			}
+			})
+	}
 
-			return err
+	if err := server.Send(snapshotResp); err != nil {
+		if status.Code(err) == codes.Canceled {
+			return nil
 		}
+
+		return err
 	}
 
 	for {
@@ -160,14 +185,18 @@ func (srv *ClusterServer) Watch(req *pb.WatchRequest, server pb.Cluster_WatchSer
 
 			if notification.Affiliate == nil {
 				resp.Deleted = true
-				resp.Affiliate = &pb.Affiliate{
-					Id: notification.AffiliateID,
+				resp.Affiliates = []*pb.Affiliate{
+					{
+						Id: notification.AffiliateID,
+					},
 				}
 			} else {
-				resp.Affiliate = &pb.Affiliate{
-					Id:        notification.Affiliate.ID,
-					Data:      notification.Affiliate.Data,
-					Endpoints: notification.Affiliate.Endpoints,
+				resp.Affiliates = []*pb.Affiliate{
+					{
+						Id:        notification.Affiliate.ID,
+						Data:      notification.Affiliate.Data,
+						Endpoints: notification.Affiliate.Endpoints,
+					},
 				}
 			}
 
diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go
index 50dd45d..3c2d669 100644
--- a/pkg/server/server_test.go
+++ b/pkg/server/server_test.go
@@ -5,7 +5,9 @@
 package server_test
 
 import (
+	"bytes"
 	"context"
+	"fmt"
 	"net"
 	"strings"
 	"testing"
@@ -20,7 +22,9 @@ import (
 	"google.golang.org/protobuf/types/known/durationpb"
 
 	"github.com/talos-systems/discovery-service/api/v1alpha1/pb"
+	_ "github.com/talos-systems/discovery-service/internal/proto"
 	"github.com/talos-systems/discovery-service/internal/state"
+	"github.com/talos-systems/discovery-service/pkg/limits"
 	"github.com/talos-systems/discovery-service/pkg/server"
 )
 
@@ -47,8 +51,8 @@ func TestServerAPI(t *testing.T) {
 
 	addr := setupServer(t)
 
-	conn, err := grpc.Dial(addr, grpc.WithInsecure())
-	require.NoError(t, err)
+	conn, e := grpc.Dial(addr, grpc.WithInsecure())
+	require.NoError(t, e)
 
 	client := pb.NewClusterClient(conn)
 
@@ -174,10 +178,12 @@ func TestServerAPI(t *testing.T) {
 
 		assert.True(t, proto.Equal(&pb.WatchResponse{
 			Deleted: false,
-			Affiliate: &pb.Affiliate{
-				Id:        "af1",
-				Data:      []byte("data1"),
-				Endpoints: [][]byte{},
+			Affiliates: []*pb.Affiliate{
+				{
+					Id:        "af1",
+					Data:      []byte("data1"),
+					Endpoints: [][]byte{},
+				},
 			},
 		}, msg))
 
@@ -193,10 +199,12 @@ func TestServerAPI(t *testing.T) {
 
 		assert.True(t, proto.Equal(&pb.WatchResponse{
 			Deleted: false,
-			Affiliate: &pb.Affiliate{
-				Id:        "af2",
-				Data:      []byte("data2"),
-				Endpoints: [][]byte{},
+			Affiliates: []*pb.Affiliate{
+				{
+					Id:        "af2",
+					Data:      []byte("data2"),
+					Endpoints: [][]byte{},
+				},
 			},
 		}, msg))
 
@@ -211,8 +219,10 @@ func TestServerAPI(t *testing.T) {
 
 		assert.True(t, proto.Equal(&pb.WatchResponse{
 			Deleted: true,
-			Affiliate: &pb.Affiliate{
-				Id: "af1",
+			Affiliates: []*pb.Affiliate{
+				{
+					Id: "af1",
+				},
 			},
 		}, msg))
 	})
@@ -223,8 +233,8 @@ func TestValidation(t *testing.T) {
 
 	addr := setupServer(t)
 
-	conn, err := grpc.Dial(addr, grpc.WithInsecure())
-	require.NoError(t, err)
+	conn, e := grpc.Dial(addr, grpc.WithInsecure())
+	require.NoError(t, e)
 
 	client := pb.NewClusterClient(conn)
 
@@ -252,7 +262,7 @@ func TestValidation(t *testing.T) {
 		assert.Equal(t, codes.InvalidArgument, status.Code(err))
 
 		_, err = client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
-			ClusterId:   strings.Repeat("A", 1024),
+			ClusterId:   strings.Repeat("A", limits.ClusterIDMax+1),
 			AffiliateId: "fake",
 		})
 		require.Error(t, err)
@@ -260,10 +270,66 @@ func TestValidation(t *testing.T) {
 
 		_, err = client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
 			ClusterId:   "fake",
-			AffiliateId: strings.Repeat("A", 1024),
+			AffiliateId: strings.Repeat("A", limits.AffiliateIDMax+1),
 		})
 		require.Error(t, err)
 		assert.Equal(t, codes.InvalidArgument, status.Code(err))
+
+		_, err = client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
+			ClusterId:     "fake",
+			AffiliateId:   "fake",
+			AffiliateData: bytes.Repeat([]byte{0}, limits.AffiliateDataMax+1),
+		})
+		require.Error(t, err)
+		assert.Equal(t, codes.InvalidArgument, status.Code(err))
+
+		_, err = client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
+			ClusterId:          "fake",
+			AffiliateId:        "fake",
+			AffiliateEndpoints: [][]byte{bytes.Repeat([]byte{0}, limits.AffiliateEndpointMax+1)},
+		})
+		require.Error(t, err)
+		assert.Equal(t, codes.InvalidArgument, status.Code(err))
+	})
+
+	t.Run("AffiliateUpdateTooMany", func(t *testing.T) {
+		t.Parallel()
+
+		for i := 0; i < limits.ClusterAffiliatesMax; i++ {
+			_, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
+				ClusterId:   "fatcluster",
+				AffiliateId: fmt.Sprintf("af%d", i),
+			})
+			require.NoError(t, err)
+		}
+
+		_, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
+			ClusterId:   "fatcluster",
+			AffiliateId: "af",
+		})
+		require.Error(t, err)
+		assert.Equal(t, codes.ResourceExhausted, status.Code(err))
+	})
+
+	t.Run("AffiliateUpdateTooManyEndpoints", func(t *testing.T) {
+		t.Parallel()
+
+		for i := 0; i < limits.AffiliateEndpointsMax; i++ {
+			_, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
+				ClusterId:          "smallcluster",
+				AffiliateId:        "af",
+				AffiliateEndpoints: [][]byte{[]byte(fmt.Sprintf("endpoint%d", i))},
+			})
+			require.NoError(t, err)
+		}
+
+		_, err := client.AffiliateUpdate(ctx, &pb.AffiliateUpdateRequest{
+			ClusterId:          "smallcluster",
+			AffiliateId:        "af",
+			AffiliateEndpoints: [][]byte{[]byte("endpoin")},
+		})
+		require.Error(t, err)
+		assert.Equal(t, codes.ResourceExhausted, status.Code(err))
 	})
 
 	t.Run("AffiliateDelete", func(t *testing.T) {
diff --git a/pkg/server/validate.go b/pkg/server/validate.go
index a855dfb..fb4db9d 100644
--- a/pkg/server/validate.go
+++ b/pkg/server/validate.go
@@ -5,13 +5,12 @@
 package server
 
 import (
+	"time"
+
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-)
 
-const (
-	clusterIDMax   = 256
-	affiliateIDMax = 256
+	"github.com/talos-systems/discovery-service/pkg/limits"
 )
 
 func validateClusterID(id string) error {
@@ -19,7 +18,7 @@ func validateClusterID(id string) error {
 		return status.Errorf(codes.InvalidArgument, "cluster ID can't be empty")
 	}
 
-	if len(id) > clusterIDMax {
+	if len(id) > limits.ClusterIDMax {
 		return status.Errorf(codes.InvalidArgument, "cluster ID is too long")
 	}
 
@@ -31,9 +30,35 @@ func validateAffiliateID(id string) error {
 		return status.Errorf(codes.InvalidArgument, "affiliate ID can't be empty")
 	}
 
-	if len(id) > affiliateIDMax {
+	if len(id) > limits.AffiliateIDMax {
 		return status.Errorf(codes.InvalidArgument, "affiliate ID is too long")
 	}
 
 	return nil
 }
+
+func validateAffiliateData(data []byte) error {
+	if len(data) > limits.AffiliateDataMax {
+		return status.Error(codes.InvalidArgument, "affiliate data is too big")
+	}
+
+	return nil
+}
+
+func validateAffiliateEndpoints(endpoints [][]byte) error {
+	for _, endpoint := range endpoints {
+		if len(endpoint) > limits.AffiliateEndpointMax {
+			return status.Errorf(codes.InvalidArgument, "affiliate endpoint is too big")
+		}
+	}
+
+	return nil
+}
+
+func validateTTL(ttl time.Duration) error {
+	if ttl > limits.TTLMax {
+		return status.Errorf(codes.InvalidArgument, "ttl is too large")
+	}
+
+	return nil
+}