Compare commits

..

1 Commits

Author SHA1 Message Date
Sheen Capadngan
28cc919ff7 misc: optimize partition script 2025-08-19 14:27:06 +08:00
23 changed files with 33 additions and 1157 deletions

View File

@@ -84,6 +84,9 @@ const up = async (knex: Knex): Promise<void> => {
t.index("expiresAt");
t.index("orgId");
t.index("projectId");
t.index("eventType");
t.index("userAgentType");
t.index("actor");
});
console.log("Adding GIN indices...");
@@ -119,8 +122,8 @@ const up = async (knex: Knex): Promise<void> => {
console.log("Creating audit log partitions ahead of time... next date:", nextDateStr);
await createAuditLogPartition(knex, nextDate, new Date(nextDate.getFullYear(), nextDate.getMonth() + 1));
// create partitions 4 years ahead
const partitionMonths = 4 * 12;
// create partitions 20 years ahead
const partitionMonths = 20 * 12;
const partitionPromises: Promise<void>[] = [];
for (let x = 1; x <= partitionMonths; x += 1) {
partitionPromises.push(

View File

@@ -179,7 +179,6 @@ export const auditLogDALFactory = (db: TDbClient) => {
numberOfRetryOnFailure = 0; // reset
} catch (error) {
numberOfRetryOnFailure += 1;
deletedAuditLogIds = [];
logger.error(error, "Failed to delete audit log on pruning");
} finally {
// eslint-disable-next-line no-await-in-loop

View File

@@ -123,7 +123,7 @@ export function createEventStreamClient(redis: Redis, options: IEventStreamClien
await redis.set(key, "1", "EX", 60);
send({ type: "ping" });
stream.push("1");
};
const close = () => {

View File

@@ -13,7 +13,6 @@ import {
ProjectPermissionPkiSubscriberActions,
ProjectPermissionPkiTemplateActions,
ProjectPermissionSecretActions,
ProjectPermissionSecretEventActions,
ProjectPermissionSecretRotationActions,
ProjectPermissionSecretScanningConfigActions,
ProjectPermissionSecretScanningDataSourceActions,
@@ -253,16 +252,6 @@ const buildAdminPermissionRules = () => {
ProjectPermissionSub.SecretScanningConfigs
);
can(
[
ProjectPermissionSecretEventActions.SubscribeCreated,
ProjectPermissionSecretEventActions.SubscribeDeleted,
ProjectPermissionSecretEventActions.SubscribeUpdated,
ProjectPermissionSecretEventActions.SubscribeImportMutations
],
ProjectPermissionSub.SecretEvents
);
return rules;
};
@@ -466,16 +455,6 @@ const buildMemberPermissionRules = () => {
can([ProjectPermissionSecretScanningConfigActions.Read], ProjectPermissionSub.SecretScanningConfigs);
can(
[
ProjectPermissionSecretEventActions.SubscribeCreated,
ProjectPermissionSecretEventActions.SubscribeDeleted,
ProjectPermissionSecretEventActions.SubscribeUpdated,
ProjectPermissionSecretEventActions.SubscribeImportMutations
],
ProjectPermissionSub.SecretEvents
);
return rules;
};
@@ -526,16 +505,6 @@ const buildViewerPermissionRules = () => {
can([ProjectPermissionSecretScanningConfigActions.Read], ProjectPermissionSub.SecretScanningConfigs);
can(
[
ProjectPermissionSecretEventActions.SubscribeCreated,
ProjectPermissionSecretEventActions.SubscribeDeleted,
ProjectPermissionSecretEventActions.SubscribeUpdated,
ProjectPermissionSecretEventActions.SubscribeImportMutations
],
ProjectPermissionSub.SecretEvents
);
return rules;
};

View File

@@ -67,7 +67,7 @@ export const registerAuthRoutes = async (server: FastifyZodProvider) => {
})
}
},
onRequest: verifyAuth([AuthMode.JWT], { requireOrg: false }),
onRequest: verifyAuth([AuthMode.JWT]),
handler: () => ({ message: "Authenticated" as const })
});

View File

@@ -310,8 +310,7 @@
"self-hosting/guides/mongo-to-postgres",
"self-hosting/guides/custom-certificates",
"self-hosting/guides/automated-bootstrapping",
"self-hosting/guides/production-hardening",
"self-hosting/guides/monitoring-telemetry"
"self-hosting/guides/production-hardening"
]
},
{

View File

@@ -1,440 +0,0 @@
---
title: "Monitoring and Telemetry Setup"
description: "Learn how to set up monitoring and telemetry for your self-hosted Infisical instance using Grafana, Prometheus, and OpenTelemetry."
---
Infisical provides comprehensive monitoring and telemetry capabilities to help you monitor the health, performance, and usage of your self-hosted instance. This guide covers setting up monitoring using Grafana with two different telemetry collection approaches.
## Overview
Infisical exports metrics in **OpenTelemetry (OTEL) format**, which provides maximum flexibility for your monitoring infrastructure. While this guide focuses on Grafana, the OTEL format means you can easily integrate with:
- **Cloud-native monitoring**: AWS CloudWatch, Google Cloud Monitoring, Azure Monitor
- **Observability platforms**: Datadog, New Relic, Splunk, Dynatrace
- **Custom backends**: Any system that supports OTEL ingestion
- **Traditional monitoring**: Prometheus, Grafana (as covered in this guide)
Infisical supports two telemetry collection methods:
1. **Pull-based (Prometheus)**: Exposes metrics on a dedicated endpoint for Prometheus to scrape
2. **Push-based (OTLP)**: Sends metrics to an OpenTelemetry Collector via OTLP protocol
Both approaches provide the same metrics data in OTEL format, so you can choose the one that best fits your infrastructure and monitoring strategy.
## Prerequisites
- Self-hosted Infisical instance running
- Access to deploy monitoring services (Prometheus, Grafana, etc.)
- Basic understanding of Prometheus and Grafana
## Environment Variables
Configure the following environment variables in your Infisical backend:
```bash
# Enable telemetry collection
OTEL_TELEMETRY_COLLECTION_ENABLED=true
# Choose export type: "prometheus" or "otlp"
OTEL_EXPORT_TYPE=prometheus
# For OTLP push mode, also configure:
# OTEL_EXPORT_OTLP_ENDPOINT=http://otel-collector:4318/v1/metrics
# OTEL_COLLECTOR_BASIC_AUTH_USERNAME=your_collector_username
# OTEL_COLLECTOR_BASIC_AUTH_PASSWORD=your_collector_password
# OTEL_OTLP_PUSH_INTERVAL=30000
```
**Note**: The `OTEL_COLLECTOR_BASIC_AUTH_USERNAME` and `OTEL_COLLECTOR_BASIC_AUTH_PASSWORD` values must match the credentials configured in your OpenTelemetry Collector's `basicauth/server` extension. These are not hardcoded values - you configure them in your collector configuration file.
## Option 1: Pull-based Monitoring (Prometheus)
This approach exposes metrics on port 9464 at the `/metrics` endpoint, allowing Prometheus to scrape the data. The metrics are exposed in Prometheus format but originate from OpenTelemetry instrumentation.
### Configuration
1. **Enable Prometheus export in Infisical**:
```bash
OTEL_TELEMETRY_COLLECTION_ENABLED=true
OTEL_EXPORT_TYPE=prometheus
```
2. **Expose the metrics port** in your Infisical backend:
- **Docker**: Expose port 9464
- **Kubernetes**: Create a service exposing port 9464
- **Other**: Ensure port 9464 is accessible to your monitoring stack
3. **Create Prometheus configuration** (`prometheus.yml`):
```yaml
global:
scrape_interval: 30s
evaluation_interval: 30s
scrape_configs:
- job_name: "infisical"
scrape_interval: 30s
static_configs:
- targets: ["infisical-backend:9464"] # Adjust hostname/port based on your deployment
metrics_path: "/metrics"
```
**Note**: Replace `infisical-backend:9464` with the actual hostname and port where your Infisical backend is running. This could be:
- **Docker Compose**: `infisical-backend:9464` (service name)
- **Kubernetes**: `infisical-backend.default.svc.cluster.local:9464` (service name)
- **Bare Metal**: `192.168.1.100:9464` (actual IP address)
- **Cloud**: `your-infisical.example.com:9464` (domain name)
### Deployment Options
#### Docker Compose
```yaml
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
command:
- "--config.file=/etc/prometheus/prometheus.yml"
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
```
#### Kubernetes
```yaml
# prometheus-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
spec:
replicas: 1
selector:
matchLabels:
app: prometheus
template:
metadata:
labels:
app: prometheus
spec:
containers:
- name: prometheus
image: prom/prometheus:latest
ports:
- containerPort: 9090
volumeMounts:
- name: config
mountPath: /etc/prometheus
volumes:
- name: config
configMap:
name: prometheus-config
---
# prometheus-service.yaml
apiVersion: v1
kind: Service
metadata:
name: prometheus
spec:
selector:
app: prometheus
ports:
- port: 9090
targetPort: 9090
type: ClusterIP
```
#### Helm
```bash
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/prometheus \
--set server.config.global.scrape_interval=30s \
--set server.config.scrape_configs[0].job_name=infisical \
--set server.config.scrape_configs[0].static_configs[0].targets[0]=infisical-backend:9464
```
## Option 2: Push-based Monitoring (OTLP)
This approach sends metrics directly to an OpenTelemetry Collector via the OTLP protocol. This gives you the most flexibility as you can configure the collector to export to multiple backends simultaneously.
### Configuration
1. **Enable OTLP export in Infisical**:
```bash
OTEL_TELEMETRY_COLLECTION_ENABLED=true
OTEL_EXPORT_TYPE=otlp
OTEL_EXPORT_OTLP_ENDPOINT=http://otel-collector:4318/v1/metrics
OTEL_COLLECTOR_BASIC_AUTH_USERNAME=infisical
OTEL_COLLECTOR_BASIC_AUTH_PASSWORD=infisical
OTEL_OTLP_PUSH_INTERVAL=30000
```
2. **Create OpenTelemetry Collector configuration** (`otel-collector-config.yaml`):
```yaml
extensions:
health_check:
pprof:
zpages:
basicauth/server:
htpasswd:
inline: |
your_username:your_password
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
auth:
authenticator: basicauth/server
prometheus:
config:
scrape_configs:
- job_name: otel-collector
scrape_interval: 30s
static_configs:
- targets: [infisical-backend:9464]
metric_relabel_configs:
- action: labeldrop
regex: "service_instance_id|service_name"
processors:
batch:
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
auth:
authenticator: basicauth/server
resource_to_telemetry_conversion:
enabled: true
service:
extensions: [basicauth/server, health_check, pprof, zpages]
pipelines:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
```
**Important**: Replace `your_username:your_password` with your chosen credentials. These must match the values you set in Infisical's `OTEL_COLLECTOR_BASIC_AUTH_USERNAME` and `OTEL_COLLECTOR_BASIC_AUTH_PASSWORD` environment variables.
3. **Create Prometheus configuration** for the collector:
```yaml
global:
scrape_interval: 30s
evaluation_interval: 30s
scrape_configs:
- job_name: "otel-collector"
scrape_interval: 30s
static_configs:
- targets: ["otel-collector:8889"] # Adjust hostname/port based on your deployment
metrics_path: "/metrics"
```
**Note**: Replace `otel-collector:8889` with the actual hostname and port where your OpenTelemetry Collector is running. This could be:
- **Docker Compose**: `otel-collector:8889` (service name)
- **Kubernetes**: `otel-collector.default.svc.cluster.local:8889` (service name)
- **Bare Metal**: `192.168.1.100:8889` (actual IP address)
- **Cloud**: `your-collector.example.com:8889` (domain name)
### Deployment Options
#### Docker Compose
```yaml
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
ports:
- 4318:4318 # OTLP http receiver
- 8889:8889 # Prometheus exporter metrics
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro
command:
- "--config=/etc/otelcol-contrib/config.yaml"
```
#### Kubernetes
```yaml
# otel-collector-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
spec:
replicas: 1
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
containers:
- name: otel-collector
image: otel/opentelemetry-collector-contrib:latest
ports:
- containerPort: 4318
- containerPort: 8889
volumeMounts:
- name: config
mountPath: /etc/otelcol-contrib
volumes:
- name: config
configMap:
name: otel-collector-config
```
#### Helm
```bash
helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
helm install otel-collector open-telemetry/opentelemetry-collector \
--set config.receivers.otlp.protocols.http.endpoint=0.0.0.0:4318 \
--set config.exporters.prometheus.endpoint=0.0.0.0:8889
```
## Alternative Backends
Since Infisical exports in OpenTelemetry format, you can easily configure the collector to send metrics to other backends instead of (or in addition to) Prometheus:
### Cloud-Native Examples
```yaml
# Add to your otel-collector-config.yaml exporters section
exporters:
# AWS CloudWatch
awsemf:
region: us-west-2
log_group_name: /aws/emf/infisical
log_stream_name: metrics
# Google Cloud Monitoring
googlecloud:
project_id: your-project-id
# Azure Monitor
azuremonitor:
connection_string: "your-connection-string"
# Datadog
datadog:
api:
key: "your-api-key"
site: "datadoghq.com"
# New Relic
newrelic:
apikey: "your-api-key"
host_override: "otlp.nr-data.net"
```
### Multi-Backend Configuration
```yaml
service:
pipelines:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus, awsemf, datadog] # Send to multiple backends
```
## Setting Up Grafana
1. **Access Grafana**: Navigate to your Grafana instance
2. **Login**: Use your configured credentials
3. **Add Prometheus Data Source**:
- Go to Configuration → Data Sources
- Click "Add data source"
- Select "Prometheus"
- Set URL to your Prometheus endpoint
- Click "Save & Test"
## Available Metrics
Infisical exposes the following key metrics in OpenTelemetry format:
### API Performance Metrics
- `API_latency` - API request latency histogram in milliseconds
- **Labels**: `route`, `method`, `statusCode`
- **Example**: Monitor response times for specific endpoints
- `API_errors` - API error count histogram
- **Labels**: `route`, `method`, `type`, `name`
- **Example**: Track error rates by endpoint and error type
### Integration & Secret Sync Metrics
- `integration_secret_sync_errors` - Integration secret sync error count
- **Labels**: `version`, `integration`, `integrationId`, `type`, `status`, `name`, `projectId`
- **Example**: Monitor integration sync failures across different services
- `secret_sync_sync_secrets_errors` - Secret sync operation error count
- **Labels**: `version`, `destination`, `syncId`, `projectId`, `type`, `status`, `name`
- **Example**: Track secret sync failures to external systems
- `secret_sync_import_secrets_errors` - Secret import operation error count
- **Labels**: `version`, `destination`, `syncId`, `projectId`, `type`, `status`, `name`
- **Example**: Monitor secret import failures
- `secret_sync_remove_secrets_errors` - Secret removal operation error count
- **Labels**: `version`, `destination`, `syncId`, `projectId`, `type`, `status`, `name`
- **Example**: Track secret removal operation failures
### System Metrics
These metrics are automatically collected by OpenTelemetry's HTTP instrumentation:
- `http_server_duration` - HTTP server request duration metrics (histogram buckets, count, sum)
- `http_client_duration` - HTTP client request duration metrics (histogram buckets, count, sum)
### Custom Business Metrics
- `infisical_secret_operations_total` - Total secret operations
- `infisical_secrets_processed_total` - Total secrets processed
## Troubleshooting
### Common Issues
1. **Metrics not appearing**:
- Check if `OTEL_TELEMETRY_COLLECTION_ENABLED=true`
- Verify the correct `OTEL_EXPORT_TYPE` is set
- Check network connectivity between services
2. **Authentication errors**:
- Verify basic auth credentials in OTLP configuration
- Check if credentials match between Infisical and collector

View File

@@ -160,9 +160,6 @@ type InfisicalSecretSpec struct {
// +kubebuilder:validation:Optional
TLS TLSConfig `json:"tls"`
// +kubebuilder:validation:Optional
InstantUpdates bool `json:"instantUpdates"`
}
// InfisicalSecretStatus defines the observed state of InfisicalSecret

View File

@@ -314,8 +314,6 @@ spec:
hostAPI:
description: Infisical host to pull secrets from
type: string
instantUpdates:
type: boolean
managedKubeConfigMapReferences:
items:
properties:

View File

@@ -9,7 +9,6 @@ metadata:
spec:
hostAPI: http://localhost:8080/api
resyncInterval: 10
instantUpdates: false
# tls:
# caRef:
# secretName: custom-ca-certificate

View File

@@ -29,4 +29,4 @@ spec:
secretName: managed-secret-k8s
secretNamespace: default
creationPolicy: "Orphan" ## Owner | Orphan
# secretType: kubernetes.io/dockerconfigjson
# secretType: kubernetes.io/dockerconfigjson

View File

@@ -1,7 +1,7 @@
# apiVersion: v1
# kind: Secret
# metadata:
# name: service-token
# type: Opaque
# data:
# infisicalToken: <base64 infisical token here>
apiVersion: v1
kind: Secret
metadata:
name: service-token
type: Opaque
data:
infisicalToken: <base64 infisical token here>

View File

@@ -4,5 +4,5 @@ metadata:
name: universal-auth-credentials
type: Opaque
stringData:
clientId: your-client-id-here
clientSecret: your-client-secret-here
clientId: da81e27e-1885-47d9-9ea3-ec7d4d807bb6
clientSecret: 2772414d440fe04d8b975f5fe25acd0fbfe71b2a4a420409eb9ac6f5ae6c1e98

View File

@@ -1,11 +1,8 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"github.com/Infisical/infisical/k8-operator/internal/model"
"github.com/go-resty/resty/v2"
)
@@ -149,85 +146,3 @@ func CallGetProjectByID(httpClient *resty.Client, request GetProjectByIDRequest)
return projectResponse, nil
}
func CallGetProjectByIDv2(httpClient *resty.Client, request GetProjectByIDRequest) (model.Project, error) {
var projectResponse model.Project
response, err := httpClient.
R().SetResult(&projectResponse).
SetHeader("User-Agent", USER_AGENT_NAME).
Get(fmt.Sprintf("%s/v2/workspace/%s", API_HOST_URL, request.ProjectID))
if err != nil {
return model.Project{}, fmt.Errorf("CallGetProject: Unable to complete api request [err=%s]", err)
}
if response.IsError() {
return model.Project{}, fmt.Errorf("CallGetProject: Unsuccessful response: [response=%s]", response)
}
return projectResponse, nil
}
func CallSubscribeProjectEvents(httpClient *resty.Client, projectId, secretsPath, envSlug, token string) (*http.Response, error) {
conditions := &SubscribeProjectEventsRequestCondition{
SecretPath: secretsPath,
EnvironmentSlug: envSlug,
}
body, err := json.Marshal(&SubscribeProjectEventsRequest{
ProjectID: projectId,
Register: []SubscribeProjectEventsRequestRegister{
{
Event: "secret:create",
Conditions: conditions,
},
{
Event: "secret:update",
Conditions: conditions,
},
{
Event: "secret:delete",
Conditions: conditions,
},
{
Event: "secret:import-mutation",
Conditions: conditions,
},
},
})
if err != nil {
return nil, fmt.Errorf("CallSubscribeProjectEvents: Unable to marshal body [err=%s]", err)
}
response, err := httpClient.
R().
SetDoNotParseResponse(true).
SetHeader("User-Agent", USER_AGENT_NAME).
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "text/event-stream").
SetHeader("Connection", "keep-alive").
SetHeader("Authorization", fmt.Sprint("Bearer ", token)).
SetBody(body).
Post(fmt.Sprintf("%s/v1/events/subscribe/project-events", API_HOST_URL))
if err != nil {
return nil, fmt.Errorf("CallSubscribeProjectEvents: Unable to complete api request [err=%s]", err)
}
if response.IsError() {
data := struct {
Message string `json:"message"`
}{}
if err := json.NewDecoder(response.RawBody()).Decode(&data); err != nil {
return nil, err
}
return nil, fmt.Errorf("CallSubscribeProjectEvents: Unsuccessful response: [message=%s]", data.Message)
}
return response.RawResponse, nil
}

View File

@@ -206,20 +206,3 @@ type GetProjectByIDRequest struct {
type GetProjectByIDResponse struct {
Project model.Project `json:"workspace"`
}
type SubscribeProjectEventsRequestRegister struct {
Event string `json:"event"`
Conditions *SubscribeProjectEventsRequestCondition `json:"conditions"`
}
type SubscribeProjectEventsRequestCondition struct {
EnvironmentSlug string `json:"environmentSlug"`
SecretPath string `json:"secretPath"`
}
type SubscribeProjectEventsRequest struct {
ProjectID string `json:"projectId"`
Register []SubscribeProjectEventsRequestRegister `json:"register"`
}
type SubscribeProjectEventsResponse struct{}

View File

@@ -231,6 +231,7 @@ func (r *InfisicalPushSecretReconciler) Reconcile(ctx context.Context, req ctrl.
}
func (r *InfisicalPushSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Custom predicate that allows both spec changes and deletions
specChangeOrDelete := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {

View File

@@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
secretsv1alpha1 "github.com/Infisical/infisical/k8-operator/api/v1alpha1"
"github.com/Infisical/infisical/k8-operator/internal/controllerhelpers"
@@ -42,10 +41,8 @@ import (
// InfisicalSecretReconciler reconciles a InfisicalSecret object
type InfisicalSecretReconciler struct {
client.Client
BaseLogger logr.Logger
Scheme *runtime.Scheme
SourceCh chan event.TypedGenericEvent[client.Object]
BaseLogger logr.Logger
Scheme *runtime.Scheme
Namespace string
IsNamespaceScoped bool
}
@@ -77,6 +74,7 @@ func (r *InfisicalSecretReconciler) GetLogger(req ctrl.Request) logr.Logger {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile
func (r *InfisicalSecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.GetLogger(req)
var infisicalSecretCRD secretsv1alpha1.InfisicalSecret
@@ -198,20 +196,6 @@ func (r *InfisicalSecretReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}, nil
}
if infisicalSecretCRD.Spec.InstantUpdates {
if err := handler.OpenInstantUpdatesStream(ctx, logger, &infisicalSecretCRD, infisicalSecretResourceVariablesMap, r.SourceCh); err != nil {
requeueTime = time.Second * 10
logger.Info(fmt.Sprintf("event stream failed. Will requeue after [requeueTime=%v] [error=%s]", requeueTime, err.Error()))
return ctrl.Result{
RequeueAfter: requeueTime,
}, nil
}
logger.Info("Instant updates are enabled")
} else {
handler.CloseInstantUpdatesStream(ctx, logger, &infisicalSecretCRD, infisicalSecretResourceVariablesMap)
}
// Sync again after the specified time
logger.Info(fmt.Sprintf("Successfully synced %d secrets. Operator will requeue after [%v]", secretsCount, requeueTime))
return ctrl.Result{
@@ -220,12 +204,7 @@ func (r *InfisicalSecretReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
func (r *InfisicalSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.SourceCh = make(chan event.TypedGenericEvent[client.Object])
return ctrl.NewControllerManagedBy(mgr).
WatchesRawSource(
source.Channel[client.Object](r.SourceCh, &util.EnqueueDelayedEventHandler{Delay: time.Second * 10}),
).
For(&secretsv1alpha1.InfisicalSecret{}, builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld.GetGeneration() == e.ObjectNew.GetGeneration() {
@@ -251,5 +230,4 @@ func (r *InfisicalSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {
},
})).
Complete(r)
}

View File

@@ -7,7 +7,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"github.com/Infisical/infisical/k8-operator/api/v1alpha1"
"github.com/Infisical/infisical/k8-operator/internal/api"
@@ -101,22 +100,3 @@ func (h *InfisicalSecretHandler) SetInfisicalAutoRedeploymentReady(ctx context.C
}
reconciler.SetInfisicalAutoRedeploymentReady(ctx, logger, infisicalSecret, numDeployments, errorToConditionOn)
}
func (h *InfisicalSecretHandler) CloseInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables) error {
reconciler := &InfisicalSecretReconciler{
Client: h.Client,
Scheme: h.Scheme,
IsNamespaceScoped: h.IsNamespaceScoped,
}
return reconciler.CloseInstantUpdatesStream(ctx, logger, infisicalSecret, resourceVariablesMap)
}
// Ensures that SSE stream is open, incase if the stream is already opened - this is a noop
func (h *InfisicalSecretHandler) OpenInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables, eventCh chan<- event.TypedGenericEvent[client.Object]) error {
reconciler := &InfisicalSecretReconciler{
Client: h.Client,
Scheme: h.Scheme,
IsNamespaceScoped: h.IsNamespaceScoped,
}
return reconciler.OpenInstantUpdatesStream(ctx, logger, infisicalSecret, resourceVariablesMap, eventCh)
}

View File

@@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strings"
tpl "text/template"
@@ -16,14 +15,11 @@ import (
"github.com/Infisical/infisical/k8-operator/internal/model"
"github.com/Infisical/infisical/k8-operator/internal/template"
"github.com/Infisical/infisical/k8-operator/internal/util"
"github.com/Infisical/infisical/k8-operator/internal/util/sse"
"github.com/go-logr/logr"
"github.com/go-resty/resty/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
infisicalSdk "github.com/infisical/go-sdk"
corev1 "k8s.io/api/core/v1"
@@ -413,10 +409,9 @@ func (r *InfisicalSecretReconciler) getResourceVariables(infisicalSecret v1alpha
})
resourceVariablesMap[string(infisicalSecret.UID)] = util.ResourceVariables{
InfisicalClient: client,
CancelCtx: cancel,
AuthDetails: util.AuthenticationDetails{},
ServerSentEvents: sse.NewConnectionRegistry(ctx),
InfisicalClient: client,
CancelCtx: cancel,
AuthDetails: util.AuthenticationDetails{},
}
resourceVariables = resourceVariablesMap[string(infisicalSecret.UID)]
@@ -426,6 +421,7 @@ func (r *InfisicalSecretReconciler) getResourceVariables(infisicalSecret v1alpha
}
return resourceVariables
}
func (r *InfisicalSecretReconciler) updateResourceVariables(infisicalSecret v1alpha1.InfisicalSecret, resourceVariables util.ResourceVariables, resourceVariablesMap map[string]util.ResourceVariables) {
@@ -458,10 +454,9 @@ func (r *InfisicalSecretReconciler) ReconcileInfisicalSecret(ctx context.Context
}
r.updateResourceVariables(*infisicalSecret, util.ResourceVariables{
InfisicalClient: infisicalClient,
CancelCtx: cancelCtx,
AuthDetails: authDetails,
ServerSentEvents: sse.NewConnectionRegistry(ctx),
InfisicalClient: infisicalClient,
CancelCtx: cancelCtx,
AuthDetails: authDetails,
}, resourceVariablesMap)
}
@@ -530,94 +525,3 @@ func (r *InfisicalSecretReconciler) ReconcileInfisicalSecret(ctx context.Context
return secretsCount, nil
}
func (r *InfisicalSecretReconciler) CloseInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables) error {
if infisicalSecret == nil {
return fmt.Errorf("infisicalSecret is nil")
}
variables := r.getResourceVariables(*infisicalSecret, resourceVariablesMap)
if !variables.AuthDetails.IsMachineIdentityAuth {
return fmt.Errorf("only machine identity is supported for subscriptions")
}
conn := variables.ServerSentEvents
if _, ok := conn.Get(); ok {
conn.Close()
}
return nil
}
func (r *InfisicalSecretReconciler) OpenInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables, eventCh chan<- event.TypedGenericEvent[client.Object]) error {
if infisicalSecret == nil {
return fmt.Errorf("infisicalSecret is nil")
}
variables := r.getResourceVariables(*infisicalSecret, resourceVariablesMap)
if !variables.AuthDetails.IsMachineIdentityAuth {
return fmt.Errorf("only machine identity is supported for subscriptions")
}
projectSlug := variables.AuthDetails.MachineIdentityScope.ProjectSlug
secretsPath := variables.AuthDetails.MachineIdentityScope.SecretsPath
envSlug := variables.AuthDetails.MachineIdentityScope.EnvSlug
infiscalClient := variables.InfisicalClient
sseRegistry := variables.ServerSentEvents
token := infiscalClient.Auth().GetAccessToken()
project, err := util.GetProjectBySlug(token, projectSlug)
if err != nil {
return fmt.Errorf("failed to get project [err=%s]", err)
}
if variables.AuthDetails.MachineIdentityScope.Recursive {
secretsPath = fmt.Sprint(secretsPath, "**")
}
if err != nil {
return fmt.Errorf("CallSubscribeProjectEvents: unable to marshal body [err=%s]", err)
}
events, errors, err := sseRegistry.Subscribe(func() (*http.Response, error) {
httpClient := resty.New()
req, err := api.CallSubscribeProjectEvents(httpClient, project.ID, secretsPath, envSlug, token)
if err != nil {
return nil, err
}
return req, nil
})
if err != nil {
return fmt.Errorf("unable to connect sse [err=%s]", err)
}
go func() {
outer:
for {
select {
case ev := <-events:
logger.Info("Received SSE Event", "event", ev)
eventCh <- event.TypedGenericEvent[client.Object]{
Object: infisicalSecret,
}
case err := <-errors:
logger.Error(err, "Error occurred")
break outer
case <-ctx.Done():
break outer
}
}
}()
return nil
}

View File

@@ -1,59 +0,0 @@
package util
import (
"context"
"math/rand"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// computeMaxJitterDuration returns a random duration between 0 and max.
// This is useful for introducing jitter to event processing.
func computeMaxJitterDuration(max time.Duration) (time.Duration, time.Duration) {
if max <= 0 {
return 0, 0
}
jitter := time.Duration(rand.Int63n(int64(max)))
return max, jitter
}
// EnqueueDelayedEventHandler enqueues reconcile requests with a random delay (jitter)
// to spread the load and avoid thundering herd issues.
type EnqueueDelayedEventHandler struct {
Delay time.Duration
}
func (e *EnqueueDelayedEventHandler) Create(_ context.Context, _ event.TypedCreateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}
func (e *EnqueueDelayedEventHandler) Update(_ context.Context, _ event.TypedUpdateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}
func (e *EnqueueDelayedEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
}
func (e *EnqueueDelayedEventHandler) Generic(_ context.Context, evt event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if evt.Object == nil {
return
}
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: evt.Object.GetNamespace(),
Name: evt.Object.GetName(),
},
}
_, delay := computeMaxJitterDuration(e.Delay)
if delay > 0 {
q.AddAfter(req, delay)
} else {
q.Add(req)
}
}

View File

@@ -3,13 +3,11 @@ package util
import (
"context"
"github.com/Infisical/infisical/k8-operator/internal/util/sse"
infisicalSdk "github.com/infisical/go-sdk"
)
type ResourceVariables struct {
InfisicalClient infisicalSdk.InfisicalClientInterface
CancelCtx context.CancelFunc
AuthDetails AuthenticationDetails
ServerSentEvents *sse.ConnectionRegistry
InfisicalClient infisicalSdk.InfisicalClientInterface
CancelCtx context.CancelFunc
AuthDetails AuthenticationDetails
}

View File

@@ -1,331 +0,0 @@
package sse
import (
"bufio"
"context"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
// Event represents a Server-Sent Event
type Event struct {
ID string
Event string
Data string
Retry int
}
// ConnectionMeta holds metadata about an SSE connection
type ConnectionMeta struct {
EventChan <-chan Event
ErrorChan <-chan error
lastPingAt atomic.Value // stores time.Time
cancel context.CancelFunc
}
// LastPing returns the last ping time
func (c *ConnectionMeta) LastPing() time.Time {
if t, ok := c.lastPingAt.Load().(time.Time); ok {
return t
}
return time.Time{}
}
// UpdateLastPing atomically updates the last ping time
func (c *ConnectionMeta) UpdateLastPing() {
c.lastPingAt.Store(time.Now())
}
// Cancel terminates the connection
func (c *ConnectionMeta) Cancel() {
if c.cancel != nil {
c.cancel()
}
}
// ConnectionRegistry manages SSE connections with high performance
type ConnectionRegistry struct {
mu sync.RWMutex
conn *ConnectionMeta
monitorOnce sync.Once
monitorStop chan struct{}
onPing func() // Callback for ping events
}
// NewConnectionRegistry creates a new high-performance connection registry
func NewConnectionRegistry(ctx context.Context) *ConnectionRegistry {
r := &ConnectionRegistry{
monitorStop: make(chan struct{}),
}
// Configure ping handler
r.onPing = func() {
r.UpdateLastPing()
}
return r
}
// Subscribe provides SSE events, creating a connection if needed
func (r *ConnectionRegistry) Subscribe(request func() (*http.Response, error)) (<-chan Event, <-chan error, error) {
// Fast path: check if connection exists
if conn := r.getConnection(); conn != nil {
return conn.EventChan, conn.ErrorChan, nil
}
// Slow path: create new connection under lock
r.mu.Lock()
defer r.mu.Unlock()
// Double-check after acquiring lock
if r.conn != nil {
return r.conn.EventChan, r.conn.ErrorChan, nil
}
res, err := request()
if err != nil {
return nil, nil, err
}
conn, err := r.createStream(res)
if err != nil {
return nil, nil, err
}
r.conn = conn
// Start monitor once
r.monitorOnce.Do(func() {
go r.monitorConnections()
})
return conn.EventChan, conn.ErrorChan, nil
}
// Get retrieves the current connection
func (r *ConnectionRegistry) Get() (*ConnectionMeta, bool) {
conn := r.getConnection()
return conn, conn != nil
}
// IsConnected checks if there's an active connection
func (r *ConnectionRegistry) IsConnected() bool {
return r.getConnection() != nil
}
// UpdateLastPing updates the last ping time for the current connection
func (r *ConnectionRegistry) UpdateLastPing() {
if conn := r.getConnection(); conn != nil {
conn.UpdateLastPing()
}
}
// Close gracefully shuts down the registry
func (r *ConnectionRegistry) Close() {
// Stop monitor first
select {
case <-r.monitorStop:
// Already closed
default:
close(r.monitorStop)
}
// Close connection
r.mu.Lock()
if r.conn != nil {
r.conn.Cancel()
r.conn = nil
}
r.mu.Unlock()
}
// getConnection returns the current connection without locking
func (r *ConnectionRegistry) getConnection() *ConnectionMeta {
r.mu.RLock()
conn := r.conn
r.mu.RUnlock()
return conn
}
func (r *ConnectionRegistry) createStream(res *http.Response) (*ConnectionMeta, error) {
ctx, cancel := context.WithCancel(context.Background())
eventChan, errorChan, err := r.stream(ctx, res)
if err != nil {
cancel()
return nil, err
}
meta := &ConnectionMeta{
EventChan: eventChan,
ErrorChan: errorChan,
cancel: cancel,
}
meta.UpdateLastPing()
return meta, nil
}
// stream processes SSE data from an HTTP response
func (r *ConnectionRegistry) stream(ctx context.Context, res *http.Response) (<-chan Event, <-chan error, error) {
eventChan := make(chan Event, 10)
errorChan := make(chan error, 1)
go r.processStream(ctx, res.Body, eventChan, errorChan)
return eventChan, errorChan, nil
}
// processStream reads and parses SSE events from the response body
func (r *ConnectionRegistry) processStream(ctx context.Context, body io.ReadCloser, eventChan chan<- Event, errorChan chan<- error) {
defer body.Close()
defer close(eventChan)
defer close(errorChan)
scanner := bufio.NewScanner(body)
var currentEvent Event
var dataBuilder strings.Builder
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
}
line := scanner.Text()
// Empty line indicates end of event
if len(line) == 0 {
if currentEvent.Data != "" || currentEvent.Event != "" {
// Finalize data
if dataBuilder.Len() > 0 {
currentEvent.Data = dataBuilder.String()
dataBuilder.Reset()
}
// Handle ping events
if r.isPingEvent(currentEvent) {
if r.onPing != nil {
r.onPing()
}
} else {
// Send non-ping events
select {
case eventChan <- currentEvent:
case <-ctx.Done():
return
}
}
// Reset for next event
currentEvent = Event{}
}
continue
}
// Parse line efficiently
r.parseLine(line, &currentEvent, &dataBuilder)
}
if err := scanner.Err(); err != nil {
select {
case errorChan <- err:
case <-ctx.Done():
}
}
}
// parseLine efficiently parses SSE protocol lines
func (r *ConnectionRegistry) parseLine(line string, event *Event, dataBuilder *strings.Builder) {
colonIndex := strings.IndexByte(line, ':')
if colonIndex == -1 {
return // Invalid line format
}
field := line[:colonIndex]
value := line[colonIndex+1:]
// Trim leading space from value (SSE spec)
if len(value) > 0 && value[0] == ' ' {
value = value[1:]
}
switch field {
case "data":
if dataBuilder.Len() > 0 {
dataBuilder.WriteByte('\n')
}
dataBuilder.WriteString(value)
case "event":
event.Event = value
case "id":
event.ID = value
case "retry":
// Parse retry value if needed
// This could be used to configure reconnection delay
case "":
// Comment line, ignore
}
}
// isPingEvent checks if an event is a ping/keepalive
func (r *ConnectionRegistry) isPingEvent(event Event) bool {
// Check for common ping patterns
if event.Event == "ping" {
return true
}
// Check for heartbeat data (common pattern is "1" or similar)
if event.Event == "" && strings.TrimSpace(event.Data) == "1" {
return true
}
return false
}
// monitorConnections checks connection health periodically
func (r *ConnectionRegistry) monitorConnections() {
const (
checkInterval = 30 * time.Second
pingTimeout = 2 * time.Minute
)
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-r.monitorStop:
return
case <-ticker.C:
r.checkConnectionHealth(pingTimeout)
}
}
}
// checkConnectionHealth verifies connection is still alive
func (r *ConnectionRegistry) checkConnectionHealth(timeout time.Duration) {
conn := r.getConnection()
if conn == nil {
return
}
if time.Since(conn.LastPing()) > timeout {
// Connection is stale, close it
r.mu.Lock()
if r.conn == conn { // Verify it's still the same connection
r.conn.Cancel()
r.monitorStop <- struct{}{}
r.conn = nil
}
r.mu.Unlock()
}
}

View File

@@ -9,6 +9,7 @@ import (
)
func GetProjectByID(accessToken string, projectId string) (model.Project, error) {
httpClient := resty.New()
httpClient.
SetAuthScheme("Bearer").
@@ -24,21 +25,3 @@ func GetProjectByID(accessToken string, projectId string) (model.Project, error)
return projectDetails.Project, nil
}
func GetProjectBySlug(accessToken string, projectSlug string) (model.Project, error) {
httpClient := resty.New()
httpClient.
SetAuthScheme("Bearer").
SetAuthToken(accessToken).
SetHeader("Accept", "application/json")
project, err := api.CallGetProjectByIDv2(httpClient, api.GetProjectByIDRequest{
ProjectID: projectSlug,
})
if err != nil {
return model.Project{}, fmt.Errorf("unable to get project by slug. [err=%v]", err)
}
return project, nil
}