feat: add queries to clean lost connections in PGCoordinator (#10938)

Adds cleanup queries to clean out "lost" peer and tunnel state after 24 hours.  We leave this state in the database so that anything trying to connect to the peer can see that it was lost, but clean it up after 24 hours to ensure our table doesn't grow without bounds.
This commit is contained in:
Spike Curtis
2023-12-01 10:02:30 +04:00
committed by GitHub
parent 0cab6e7763
commit 571d358e4b
7 changed files with 104 additions and 0 deletions

View File

@ -695,6 +695,20 @@ func (q *querier) CleanTailnetCoordinators(ctx context.Context) error {
return q.db.CleanTailnetCoordinators(ctx)
}
func (q *querier) CleanTailnetLostPeers(ctx context.Context) error {
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return err
}
return q.db.CleanTailnetLostPeers(ctx)
}
func (q *querier) CleanTailnetTunnels(ctx context.Context) error {
if err := q.authorizeContext(ctx, rbac.ActionDelete, rbac.ResourceTailnetCoordinator); err != nil {
return err
}
return q.db.CleanTailnetTunnels(ctx)
}
func (q *querier) DeleteAPIKeyByID(ctx context.Context, id string) error {
return deleteQ(q.log, q.auth, q.db.GetAPIKeyByID, q.db.DeleteAPIKeyByID)(ctx, id)
}

View File

@ -956,6 +956,14 @@ func (*FakeQuerier) CleanTailnetCoordinators(_ context.Context) error {
return ErrUnimplemented
}
func (*FakeQuerier) CleanTailnetLostPeers(context.Context) error {
return ErrUnimplemented
}
func (*FakeQuerier) CleanTailnetTunnels(context.Context) error {
return ErrUnimplemented
}
func (q *FakeQuerier) DeleteAPIKeyByID(_ context.Context, id string) error {
q.mutex.Lock()
defer q.mutex.Unlock()

View File

@ -121,6 +121,20 @@ func (m metricsStore) CleanTailnetCoordinators(ctx context.Context) error {
return err
}
func (m metricsStore) CleanTailnetLostPeers(ctx context.Context) error {
start := time.Now()
r0 := m.s.CleanTailnetLostPeers(ctx)
m.queryLatencies.WithLabelValues("CleanTailnetLostPeers").Observe(time.Since(start).Seconds())
return r0
}
func (m metricsStore) CleanTailnetTunnels(ctx context.Context) error {
start := time.Now()
r0 := m.s.CleanTailnetTunnels(ctx)
m.queryLatencies.WithLabelValues("CleanTailnetTunnels").Observe(time.Since(start).Seconds())
return r0
}
func (m metricsStore) DeleteAPIKeyByID(ctx context.Context, id string) error {
start := time.Now()
err := m.s.DeleteAPIKeyByID(ctx, id)

View File

@ -126,6 +126,34 @@ func (mr *MockStoreMockRecorder) CleanTailnetCoordinators(arg0 interface{}) *gom
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanTailnetCoordinators", reflect.TypeOf((*MockStore)(nil).CleanTailnetCoordinators), arg0)
}
// CleanTailnetLostPeers mocks base method.
func (m *MockStore) CleanTailnetLostPeers(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CleanTailnetLostPeers", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CleanTailnetLostPeers indicates an expected call of CleanTailnetLostPeers.
func (mr *MockStoreMockRecorder) CleanTailnetLostPeers(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanTailnetLostPeers", reflect.TypeOf((*MockStore)(nil).CleanTailnetLostPeers), arg0)
}
// CleanTailnetTunnels mocks base method.
func (m *MockStore) CleanTailnetTunnels(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CleanTailnetTunnels", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CleanTailnetTunnels indicates an expected call of CleanTailnetTunnels.
func (mr *MockStoreMockRecorder) CleanTailnetTunnels(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanTailnetTunnels", reflect.TypeOf((*MockStore)(nil).CleanTailnetTunnels), arg0)
}
// DeleteAPIKeyByID mocks base method.
func (m *MockStore) DeleteAPIKeyByID(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()

View File

@ -43,6 +43,8 @@ type sqlcQuerier interface {
// referenced by the latest build of a workspace.
ArchiveUnusedTemplateVersions(ctx context.Context, arg ArchiveUnusedTemplateVersionsParams) ([]uuid.UUID, error)
CleanTailnetCoordinators(ctx context.Context) error
CleanTailnetLostPeers(ctx context.Context) error
CleanTailnetTunnels(ctx context.Context) error
DeleteAPIKeyByID(ctx context.Context, id string) error
DeleteAPIKeysByUserID(ctx context.Context, userID uuid.UUID) error
DeleteAllTailnetClientSubscriptions(ctx context.Context, arg DeleteAllTailnetClientSubscriptionsParams) error

View File

@ -4522,6 +4522,31 @@ func (q *sqlQuerier) CleanTailnetCoordinators(ctx context.Context) error {
return err
}
const cleanTailnetLostPeers = `-- name: CleanTailnetLostPeers :exec
DELETE
FROM tailnet_peers
WHERE updated_at < now() - INTERVAL '24 HOURS' AND status = 'lost'::tailnet_status
`
func (q *sqlQuerier) CleanTailnetLostPeers(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, cleanTailnetLostPeers)
return err
}
const cleanTailnetTunnels = `-- name: CleanTailnetTunnels :exec
DELETE FROM tailnet_tunnels
WHERE updated_at < now() - INTERVAL '24 HOURS' AND
NOT EXISTS (
SELECT 1 FROM tailnet_peers
WHERE id = tailnet_tunnels.src_id AND coordinator_id = tailnet_tunnels.coordinator_id
)
`
func (q *sqlQuerier) CleanTailnetTunnels(ctx context.Context) error {
_, err := q.db.ExecContext(ctx, cleanTailnetTunnels)
return err
}
const deleteAllTailnetClientSubscriptions = `-- name: DeleteAllTailnetClientSubscriptions :exec
DELETE
FROM tailnet_client_subscriptions

View File

@ -122,6 +122,19 @@ DELETE
FROM tailnet_coordinators
WHERE heartbeat_at < now() - INTERVAL '24 HOURS';
-- name: CleanTailnetLostPeers :exec
DELETE
FROM tailnet_peers
WHERE updated_at < now() - INTERVAL '24 HOURS' AND status = 'lost'::tailnet_status;
-- name: CleanTailnetTunnels :exec
DELETE FROM tailnet_tunnels
WHERE updated_at < now() - INTERVAL '24 HOURS' AND
NOT EXISTS (
SELECT 1 FROM tailnet_peers
WHERE id = tailnet_tunnels.src_id AND coordinator_id = tailnet_tunnels.coordinator_id
);
-- name: UpsertTailnetPeer :one
INSERT INTO
tailnet_peers (