mirror of
https://github.com/Infisical/infisical.git
synced 2025-08-22 10:12:15 +00:00
Compare commits
1 Commits
fix/crd-is
...
misc/optim
Author | SHA1 | Date | |
---|---|---|---|
|
28cc919ff7 |
@@ -84,6 +84,9 @@ const up = async (knex: Knex): Promise<void> => {
|
||||
t.index("expiresAt");
|
||||
t.index("orgId");
|
||||
t.index("projectId");
|
||||
t.index("eventType");
|
||||
t.index("userAgentType");
|
||||
t.index("actor");
|
||||
});
|
||||
|
||||
console.log("Adding GIN indices...");
|
||||
@@ -119,8 +122,8 @@ const up = async (knex: Knex): Promise<void> => {
|
||||
console.log("Creating audit log partitions ahead of time... next date:", nextDateStr);
|
||||
await createAuditLogPartition(knex, nextDate, new Date(nextDate.getFullYear(), nextDate.getMonth() + 1));
|
||||
|
||||
// create partitions 4 years ahead
|
||||
const partitionMonths = 4 * 12;
|
||||
// create partitions 20 years ahead
|
||||
const partitionMonths = 20 * 12;
|
||||
const partitionPromises: Promise<void>[] = [];
|
||||
for (let x = 1; x <= partitionMonths; x += 1) {
|
||||
partitionPromises.push(
|
||||
|
@@ -179,7 +179,6 @@ export const auditLogDALFactory = (db: TDbClient) => {
|
||||
numberOfRetryOnFailure = 0; // reset
|
||||
} catch (error) {
|
||||
numberOfRetryOnFailure += 1;
|
||||
deletedAuditLogIds = [];
|
||||
logger.error(error, "Failed to delete audit log on pruning");
|
||||
} finally {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
|
@@ -123,7 +123,7 @@ export function createEventStreamClient(redis: Redis, options: IEventStreamClien
|
||||
|
||||
await redis.set(key, "1", "EX", 60);
|
||||
|
||||
send({ type: "ping" });
|
||||
stream.push("1");
|
||||
};
|
||||
|
||||
const close = () => {
|
||||
|
@@ -13,7 +13,6 @@ import {
|
||||
ProjectPermissionPkiSubscriberActions,
|
||||
ProjectPermissionPkiTemplateActions,
|
||||
ProjectPermissionSecretActions,
|
||||
ProjectPermissionSecretEventActions,
|
||||
ProjectPermissionSecretRotationActions,
|
||||
ProjectPermissionSecretScanningConfigActions,
|
||||
ProjectPermissionSecretScanningDataSourceActions,
|
||||
@@ -253,16 +252,6 @@ const buildAdminPermissionRules = () => {
|
||||
ProjectPermissionSub.SecretScanningConfigs
|
||||
);
|
||||
|
||||
can(
|
||||
[
|
||||
ProjectPermissionSecretEventActions.SubscribeCreated,
|
||||
ProjectPermissionSecretEventActions.SubscribeDeleted,
|
||||
ProjectPermissionSecretEventActions.SubscribeUpdated,
|
||||
ProjectPermissionSecretEventActions.SubscribeImportMutations
|
||||
],
|
||||
ProjectPermissionSub.SecretEvents
|
||||
);
|
||||
|
||||
return rules;
|
||||
};
|
||||
|
||||
@@ -466,16 +455,6 @@ const buildMemberPermissionRules = () => {
|
||||
|
||||
can([ProjectPermissionSecretScanningConfigActions.Read], ProjectPermissionSub.SecretScanningConfigs);
|
||||
|
||||
can(
|
||||
[
|
||||
ProjectPermissionSecretEventActions.SubscribeCreated,
|
||||
ProjectPermissionSecretEventActions.SubscribeDeleted,
|
||||
ProjectPermissionSecretEventActions.SubscribeUpdated,
|
||||
ProjectPermissionSecretEventActions.SubscribeImportMutations
|
||||
],
|
||||
ProjectPermissionSub.SecretEvents
|
||||
);
|
||||
|
||||
return rules;
|
||||
};
|
||||
|
||||
@@ -526,16 +505,6 @@ const buildViewerPermissionRules = () => {
|
||||
|
||||
can([ProjectPermissionSecretScanningConfigActions.Read], ProjectPermissionSub.SecretScanningConfigs);
|
||||
|
||||
can(
|
||||
[
|
||||
ProjectPermissionSecretEventActions.SubscribeCreated,
|
||||
ProjectPermissionSecretEventActions.SubscribeDeleted,
|
||||
ProjectPermissionSecretEventActions.SubscribeUpdated,
|
||||
ProjectPermissionSecretEventActions.SubscribeImportMutations
|
||||
],
|
||||
ProjectPermissionSub.SecretEvents
|
||||
);
|
||||
|
||||
return rules;
|
||||
};
|
||||
|
||||
|
@@ -67,7 +67,7 @@ export const registerAuthRoutes = async (server: FastifyZodProvider) => {
|
||||
})
|
||||
}
|
||||
},
|
||||
onRequest: verifyAuth([AuthMode.JWT], { requireOrg: false }),
|
||||
onRequest: verifyAuth([AuthMode.JWT]),
|
||||
handler: () => ({ message: "Authenticated" as const })
|
||||
});
|
||||
|
||||
|
@@ -310,8 +310,7 @@
|
||||
"self-hosting/guides/mongo-to-postgres",
|
||||
"self-hosting/guides/custom-certificates",
|
||||
"self-hosting/guides/automated-bootstrapping",
|
||||
"self-hosting/guides/production-hardening",
|
||||
"self-hosting/guides/monitoring-telemetry"
|
||||
"self-hosting/guides/production-hardening"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
@@ -1,440 +0,0 @@
|
||||
---
|
||||
title: "Monitoring and Telemetry Setup"
|
||||
description: "Learn how to set up monitoring and telemetry for your self-hosted Infisical instance using Grafana, Prometheus, and OpenTelemetry."
|
||||
---
|
||||
|
||||
Infisical provides comprehensive monitoring and telemetry capabilities to help you monitor the health, performance, and usage of your self-hosted instance. This guide covers setting up monitoring using Grafana with two different telemetry collection approaches.
|
||||
|
||||
## Overview
|
||||
|
||||
Infisical exports metrics in **OpenTelemetry (OTEL) format**, which provides maximum flexibility for your monitoring infrastructure. While this guide focuses on Grafana, the OTEL format means you can easily integrate with:
|
||||
|
||||
- **Cloud-native monitoring**: AWS CloudWatch, Google Cloud Monitoring, Azure Monitor
|
||||
- **Observability platforms**: Datadog, New Relic, Splunk, Dynatrace
|
||||
- **Custom backends**: Any system that supports OTEL ingestion
|
||||
- **Traditional monitoring**: Prometheus, Grafana (as covered in this guide)
|
||||
|
||||
Infisical supports two telemetry collection methods:
|
||||
|
||||
1. **Pull-based (Prometheus)**: Exposes metrics on a dedicated endpoint for Prometheus to scrape
|
||||
2. **Push-based (OTLP)**: Sends metrics to an OpenTelemetry Collector via OTLP protocol
|
||||
|
||||
Both approaches provide the same metrics data in OTEL format, so you can choose the one that best fits your infrastructure and monitoring strategy.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Self-hosted Infisical instance running
|
||||
- Access to deploy monitoring services (Prometheus, Grafana, etc.)
|
||||
- Basic understanding of Prometheus and Grafana
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Configure the following environment variables in your Infisical backend:
|
||||
|
||||
```bash
|
||||
# Enable telemetry collection
|
||||
OTEL_TELEMETRY_COLLECTION_ENABLED=true
|
||||
|
||||
# Choose export type: "prometheus" or "otlp"
|
||||
OTEL_EXPORT_TYPE=prometheus
|
||||
|
||||
# For OTLP push mode, also configure:
|
||||
# OTEL_EXPORT_OTLP_ENDPOINT=http://otel-collector:4318/v1/metrics
|
||||
# OTEL_COLLECTOR_BASIC_AUTH_USERNAME=your_collector_username
|
||||
# OTEL_COLLECTOR_BASIC_AUTH_PASSWORD=your_collector_password
|
||||
# OTEL_OTLP_PUSH_INTERVAL=30000
|
||||
```
|
||||
|
||||
**Note**: The `OTEL_COLLECTOR_BASIC_AUTH_USERNAME` and `OTEL_COLLECTOR_BASIC_AUTH_PASSWORD` values must match the credentials configured in your OpenTelemetry Collector's `basicauth/server` extension. These are not hardcoded values - you configure them in your collector configuration file.
|
||||
|
||||
## Option 1: Pull-based Monitoring (Prometheus)
|
||||
|
||||
This approach exposes metrics on port 9464 at the `/metrics` endpoint, allowing Prometheus to scrape the data. The metrics are exposed in Prometheus format but originate from OpenTelemetry instrumentation.
|
||||
|
||||
### Configuration
|
||||
|
||||
1. **Enable Prometheus export in Infisical**:
|
||||
|
||||
```bash
|
||||
OTEL_TELEMETRY_COLLECTION_ENABLED=true
|
||||
OTEL_EXPORT_TYPE=prometheus
|
||||
```
|
||||
|
||||
2. **Expose the metrics port** in your Infisical backend:
|
||||
|
||||
- **Docker**: Expose port 9464
|
||||
- **Kubernetes**: Create a service exposing port 9464
|
||||
- **Other**: Ensure port 9464 is accessible to your monitoring stack
|
||||
|
||||
3. **Create Prometheus configuration** (`prometheus.yml`):
|
||||
|
||||
```yaml
|
||||
global:
|
||||
scrape_interval: 30s
|
||||
evaluation_interval: 30s
|
||||
|
||||
scrape_configs:
|
||||
- job_name: "infisical"
|
||||
scrape_interval: 30s
|
||||
static_configs:
|
||||
- targets: ["infisical-backend:9464"] # Adjust hostname/port based on your deployment
|
||||
metrics_path: "/metrics"
|
||||
```
|
||||
|
||||
**Note**: Replace `infisical-backend:9464` with the actual hostname and port where your Infisical backend is running. This could be:
|
||||
|
||||
- **Docker Compose**: `infisical-backend:9464` (service name)
|
||||
- **Kubernetes**: `infisical-backend.default.svc.cluster.local:9464` (service name)
|
||||
- **Bare Metal**: `192.168.1.100:9464` (actual IP address)
|
||||
- **Cloud**: `your-infisical.example.com:9464` (domain name)
|
||||
|
||||
### Deployment Options
|
||||
|
||||
#### Docker Compose
|
||||
|
||||
```yaml
|
||||
services:
|
||||
prometheus:
|
||||
image: prom/prometheus:latest
|
||||
ports:
|
||||
- "9090:9090"
|
||||
volumes:
|
||||
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
|
||||
command:
|
||||
- "--config.file=/etc/prometheus/prometheus.yml"
|
||||
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
ports:
|
||||
- "3000:3000"
|
||||
environment:
|
||||
- GF_SECURITY_ADMIN_USER=admin
|
||||
- GF_SECURITY_ADMIN_PASSWORD=admin
|
||||
```
|
||||
|
||||
#### Kubernetes
|
||||
|
||||
```yaml
|
||||
# prometheus-deployment.yaml
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: prometheus
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: prometheus
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: prometheus
|
||||
spec:
|
||||
containers:
|
||||
- name: prometheus
|
||||
image: prom/prometheus:latest
|
||||
ports:
|
||||
- containerPort: 9090
|
||||
volumeMounts:
|
||||
- name: config
|
||||
mountPath: /etc/prometheus
|
||||
volumes:
|
||||
- name: config
|
||||
configMap:
|
||||
name: prometheus-config
|
||||
|
||||
---
|
||||
# prometheus-service.yaml
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: prometheus
|
||||
spec:
|
||||
selector:
|
||||
app: prometheus
|
||||
ports:
|
||||
- port: 9090
|
||||
targetPort: 9090
|
||||
type: ClusterIP
|
||||
```
|
||||
|
||||
#### Helm
|
||||
|
||||
```bash
|
||||
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
|
||||
helm install prometheus prometheus-community/prometheus \
|
||||
--set server.config.global.scrape_interval=30s \
|
||||
--set server.config.scrape_configs[0].job_name=infisical \
|
||||
--set server.config.scrape_configs[0].static_configs[0].targets[0]=infisical-backend:9464
|
||||
```
|
||||
|
||||
## Option 2: Push-based Monitoring (OTLP)
|
||||
|
||||
This approach sends metrics directly to an OpenTelemetry Collector via the OTLP protocol. This gives you the most flexibility as you can configure the collector to export to multiple backends simultaneously.
|
||||
|
||||
### Configuration
|
||||
|
||||
1. **Enable OTLP export in Infisical**:
|
||||
|
||||
```bash
|
||||
OTEL_TELEMETRY_COLLECTION_ENABLED=true
|
||||
OTEL_EXPORT_TYPE=otlp
|
||||
OTEL_EXPORT_OTLP_ENDPOINT=http://otel-collector:4318/v1/metrics
|
||||
OTEL_COLLECTOR_BASIC_AUTH_USERNAME=infisical
|
||||
OTEL_COLLECTOR_BASIC_AUTH_PASSWORD=infisical
|
||||
OTEL_OTLP_PUSH_INTERVAL=30000
|
||||
```
|
||||
|
||||
2. **Create OpenTelemetry Collector configuration** (`otel-collector-config.yaml`):
|
||||
|
||||
```yaml
|
||||
extensions:
|
||||
health_check:
|
||||
pprof:
|
||||
zpages:
|
||||
basicauth/server:
|
||||
htpasswd:
|
||||
inline: |
|
||||
your_username:your_password
|
||||
|
||||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
http:
|
||||
endpoint: 0.0.0.0:4318
|
||||
auth:
|
||||
authenticator: basicauth/server
|
||||
|
||||
prometheus:
|
||||
config:
|
||||
scrape_configs:
|
||||
- job_name: otel-collector
|
||||
scrape_interval: 30s
|
||||
static_configs:
|
||||
- targets: [infisical-backend:9464]
|
||||
metric_relabel_configs:
|
||||
- action: labeldrop
|
||||
regex: "service_instance_id|service_name"
|
||||
|
||||
processors:
|
||||
batch:
|
||||
|
||||
exporters:
|
||||
prometheus:
|
||||
endpoint: "0.0.0.0:8889"
|
||||
auth:
|
||||
authenticator: basicauth/server
|
||||
resource_to_telemetry_conversion:
|
||||
enabled: true
|
||||
|
||||
service:
|
||||
extensions: [basicauth/server, health_check, pprof, zpages]
|
||||
pipelines:
|
||||
metrics:
|
||||
receivers: [otlp]
|
||||
processors: [batch]
|
||||
exporters: [prometheus]
|
||||
```
|
||||
|
||||
**Important**: Replace `your_username:your_password` with your chosen credentials. These must match the values you set in Infisical's `OTEL_COLLECTOR_BASIC_AUTH_USERNAME` and `OTEL_COLLECTOR_BASIC_AUTH_PASSWORD` environment variables.
|
||||
|
||||
3. **Create Prometheus configuration** for the collector:
|
||||
|
||||
```yaml
|
||||
global:
|
||||
scrape_interval: 30s
|
||||
evaluation_interval: 30s
|
||||
|
||||
scrape_configs:
|
||||
- job_name: "otel-collector"
|
||||
scrape_interval: 30s
|
||||
static_configs:
|
||||
- targets: ["otel-collector:8889"] # Adjust hostname/port based on your deployment
|
||||
metrics_path: "/metrics"
|
||||
```
|
||||
|
||||
**Note**: Replace `otel-collector:8889` with the actual hostname and port where your OpenTelemetry Collector is running. This could be:
|
||||
|
||||
- **Docker Compose**: `otel-collector:8889` (service name)
|
||||
- **Kubernetes**: `otel-collector.default.svc.cluster.local:8889` (service name)
|
||||
- **Bare Metal**: `192.168.1.100:8889` (actual IP address)
|
||||
- **Cloud**: `your-collector.example.com:8889` (domain name)
|
||||
|
||||
### Deployment Options
|
||||
|
||||
#### Docker Compose
|
||||
|
||||
```yaml
|
||||
services:
|
||||
otel-collector:
|
||||
image: otel/opentelemetry-collector-contrib:latest
|
||||
ports:
|
||||
- 4318:4318 # OTLP http receiver
|
||||
- 8889:8889 # Prometheus exporter metrics
|
||||
volumes:
|
||||
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro
|
||||
command:
|
||||
- "--config=/etc/otelcol-contrib/config.yaml"
|
||||
```
|
||||
|
||||
#### Kubernetes
|
||||
|
||||
```yaml
|
||||
# otel-collector-deployment.yaml
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: otel-collector
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: otel-collector
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: otel-collector
|
||||
spec:
|
||||
containers:
|
||||
- name: otel-collector
|
||||
image: otel/opentelemetry-collector-contrib:latest
|
||||
ports:
|
||||
- containerPort: 4318
|
||||
- containerPort: 8889
|
||||
volumeMounts:
|
||||
- name: config
|
||||
mountPath: /etc/otelcol-contrib
|
||||
volumes:
|
||||
- name: config
|
||||
configMap:
|
||||
name: otel-collector-config
|
||||
```
|
||||
|
||||
#### Helm
|
||||
|
||||
```bash
|
||||
helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
|
||||
helm install otel-collector open-telemetry/opentelemetry-collector \
|
||||
--set config.receivers.otlp.protocols.http.endpoint=0.0.0.0:4318 \
|
||||
--set config.exporters.prometheus.endpoint=0.0.0.0:8889
|
||||
```
|
||||
|
||||
## Alternative Backends
|
||||
|
||||
Since Infisical exports in OpenTelemetry format, you can easily configure the collector to send metrics to other backends instead of (or in addition to) Prometheus:
|
||||
|
||||
### Cloud-Native Examples
|
||||
|
||||
```yaml
|
||||
# Add to your otel-collector-config.yaml exporters section
|
||||
exporters:
|
||||
# AWS CloudWatch
|
||||
awsemf:
|
||||
region: us-west-2
|
||||
log_group_name: /aws/emf/infisical
|
||||
log_stream_name: metrics
|
||||
|
||||
# Google Cloud Monitoring
|
||||
googlecloud:
|
||||
project_id: your-project-id
|
||||
|
||||
# Azure Monitor
|
||||
azuremonitor:
|
||||
connection_string: "your-connection-string"
|
||||
|
||||
# Datadog
|
||||
datadog:
|
||||
api:
|
||||
key: "your-api-key"
|
||||
site: "datadoghq.com"
|
||||
|
||||
# New Relic
|
||||
newrelic:
|
||||
apikey: "your-api-key"
|
||||
host_override: "otlp.nr-data.net"
|
||||
```
|
||||
|
||||
### Multi-Backend Configuration
|
||||
|
||||
```yaml
|
||||
service:
|
||||
pipelines:
|
||||
metrics:
|
||||
receivers: [otlp]
|
||||
processors: [batch]
|
||||
exporters: [prometheus, awsemf, datadog] # Send to multiple backends
|
||||
```
|
||||
|
||||
## Setting Up Grafana
|
||||
|
||||
1. **Access Grafana**: Navigate to your Grafana instance
|
||||
2. **Login**: Use your configured credentials
|
||||
3. **Add Prometheus Data Source**:
|
||||
- Go to Configuration → Data Sources
|
||||
- Click "Add data source"
|
||||
- Select "Prometheus"
|
||||
- Set URL to your Prometheus endpoint
|
||||
- Click "Save & Test"
|
||||
|
||||
## Available Metrics
|
||||
|
||||
Infisical exposes the following key metrics in OpenTelemetry format:
|
||||
|
||||
### API Performance Metrics
|
||||
|
||||
- `API_latency` - API request latency histogram in milliseconds
|
||||
|
||||
- **Labels**: `route`, `method`, `statusCode`
|
||||
- **Example**: Monitor response times for specific endpoints
|
||||
|
||||
- `API_errors` - API error count histogram
|
||||
- **Labels**: `route`, `method`, `type`, `name`
|
||||
- **Example**: Track error rates by endpoint and error type
|
||||
|
||||
### Integration & Secret Sync Metrics
|
||||
|
||||
- `integration_secret_sync_errors` - Integration secret sync error count
|
||||
|
||||
- **Labels**: `version`, `integration`, `integrationId`, `type`, `status`, `name`, `projectId`
|
||||
- **Example**: Monitor integration sync failures across different services
|
||||
|
||||
- `secret_sync_sync_secrets_errors` - Secret sync operation error count
|
||||
|
||||
- **Labels**: `version`, `destination`, `syncId`, `projectId`, `type`, `status`, `name`
|
||||
- **Example**: Track secret sync failures to external systems
|
||||
|
||||
- `secret_sync_import_secrets_errors` - Secret import operation error count
|
||||
|
||||
- **Labels**: `version`, `destination`, `syncId`, `projectId`, `type`, `status`, `name`
|
||||
- **Example**: Monitor secret import failures
|
||||
|
||||
- `secret_sync_remove_secrets_errors` - Secret removal operation error count
|
||||
- **Labels**: `version`, `destination`, `syncId`, `projectId`, `type`, `status`, `name`
|
||||
- **Example**: Track secret removal operation failures
|
||||
|
||||
### System Metrics
|
||||
|
||||
These metrics are automatically collected by OpenTelemetry's HTTP instrumentation:
|
||||
|
||||
- `http_server_duration` - HTTP server request duration metrics (histogram buckets, count, sum)
|
||||
- `http_client_duration` - HTTP client request duration metrics (histogram buckets, count, sum)
|
||||
|
||||
### Custom Business Metrics
|
||||
|
||||
- `infisical_secret_operations_total` - Total secret operations
|
||||
- `infisical_secrets_processed_total` - Total secrets processed
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
1. **Metrics not appearing**:
|
||||
|
||||
- Check if `OTEL_TELEMETRY_COLLECTION_ENABLED=true`
|
||||
- Verify the correct `OTEL_EXPORT_TYPE` is set
|
||||
- Check network connectivity between services
|
||||
|
||||
2. **Authentication errors**:
|
||||
|
||||
- Verify basic auth credentials in OTLP configuration
|
||||
- Check if credentials match between Infisical and collector
|
@@ -160,9 +160,6 @@ type InfisicalSecretSpec struct {
|
||||
|
||||
// +kubebuilder:validation:Optional
|
||||
TLS TLSConfig `json:"tls"`
|
||||
|
||||
// +kubebuilder:validation:Optional
|
||||
InstantUpdates bool `json:"instantUpdates"`
|
||||
}
|
||||
|
||||
// InfisicalSecretStatus defines the observed state of InfisicalSecret
|
||||
|
@@ -314,8 +314,6 @@ spec:
|
||||
hostAPI:
|
||||
description: Infisical host to pull secrets from
|
||||
type: string
|
||||
instantUpdates:
|
||||
type: boolean
|
||||
managedKubeConfigMapReferences:
|
||||
items:
|
||||
properties:
|
||||
|
@@ -9,7 +9,6 @@ metadata:
|
||||
spec:
|
||||
hostAPI: http://localhost:8080/api
|
||||
resyncInterval: 10
|
||||
instantUpdates: false
|
||||
# tls:
|
||||
# caRef:
|
||||
# secretName: custom-ca-certificate
|
||||
|
@@ -1,7 +1,7 @@
|
||||
# apiVersion: v1
|
||||
# kind: Secret
|
||||
# metadata:
|
||||
# name: service-token
|
||||
# type: Opaque
|
||||
# data:
|
||||
# infisicalToken: <base64 infisical token here>
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: service-token
|
||||
type: Opaque
|
||||
data:
|
||||
infisicalToken: <base64 infisical token here>
|
@@ -4,5 +4,5 @@ metadata:
|
||||
name: universal-auth-credentials
|
||||
type: Opaque
|
||||
stringData:
|
||||
clientId: your-client-id-here
|
||||
clientSecret: your-client-secret-here
|
||||
clientId: da81e27e-1885-47d9-9ea3-ec7d4d807bb6
|
||||
clientSecret: 2772414d440fe04d8b975f5fe25acd0fbfe71b2a4a420409eb9ac6f5ae6c1e98
|
||||
|
@@ -1,11 +1,8 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/Infisical/infisical/k8-operator/internal/model"
|
||||
"github.com/go-resty/resty/v2"
|
||||
)
|
||||
|
||||
@@ -149,85 +146,3 @@ func CallGetProjectByID(httpClient *resty.Client, request GetProjectByIDRequest)
|
||||
return projectResponse, nil
|
||||
|
||||
}
|
||||
|
||||
func CallGetProjectByIDv2(httpClient *resty.Client, request GetProjectByIDRequest) (model.Project, error) {
|
||||
var projectResponse model.Project
|
||||
|
||||
response, err := httpClient.
|
||||
R().SetResult(&projectResponse).
|
||||
SetHeader("User-Agent", USER_AGENT_NAME).
|
||||
Get(fmt.Sprintf("%s/v2/workspace/%s", API_HOST_URL, request.ProjectID))
|
||||
|
||||
if err != nil {
|
||||
return model.Project{}, fmt.Errorf("CallGetProject: Unable to complete api request [err=%s]", err)
|
||||
}
|
||||
|
||||
if response.IsError() {
|
||||
return model.Project{}, fmt.Errorf("CallGetProject: Unsuccessful response: [response=%s]", response)
|
||||
}
|
||||
|
||||
return projectResponse, nil
|
||||
|
||||
}
|
||||
|
||||
func CallSubscribeProjectEvents(httpClient *resty.Client, projectId, secretsPath, envSlug, token string) (*http.Response, error) {
|
||||
conditions := &SubscribeProjectEventsRequestCondition{
|
||||
SecretPath: secretsPath,
|
||||
EnvironmentSlug: envSlug,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(&SubscribeProjectEventsRequest{
|
||||
ProjectID: projectId,
|
||||
Register: []SubscribeProjectEventsRequestRegister{
|
||||
{
|
||||
Event: "secret:create",
|
||||
Conditions: conditions,
|
||||
},
|
||||
{
|
||||
Event: "secret:update",
|
||||
Conditions: conditions,
|
||||
},
|
||||
{
|
||||
Event: "secret:delete",
|
||||
Conditions: conditions,
|
||||
},
|
||||
{
|
||||
Event: "secret:import-mutation",
|
||||
Conditions: conditions,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("CallSubscribeProjectEvents: Unable to marshal body [err=%s]", err)
|
||||
}
|
||||
|
||||
response, err := httpClient.
|
||||
R().
|
||||
SetDoNotParseResponse(true).
|
||||
SetHeader("User-Agent", USER_AGENT_NAME).
|
||||
SetHeader("Content-Type", "application/json").
|
||||
SetHeader("Accept", "text/event-stream").
|
||||
SetHeader("Connection", "keep-alive").
|
||||
SetHeader("Authorization", fmt.Sprint("Bearer ", token)).
|
||||
SetBody(body).
|
||||
Post(fmt.Sprintf("%s/v1/events/subscribe/project-events", API_HOST_URL))
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("CallSubscribeProjectEvents: Unable to complete api request [err=%s]", err)
|
||||
}
|
||||
|
||||
if response.IsError() {
|
||||
data := struct {
|
||||
Message string `json:"message"`
|
||||
}{}
|
||||
|
||||
if err := json.NewDecoder(response.RawBody()).Decode(&data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("CallSubscribeProjectEvents: Unsuccessful response: [message=%s]", data.Message)
|
||||
}
|
||||
|
||||
return response.RawResponse, nil
|
||||
}
|
||||
|
@@ -206,20 +206,3 @@ type GetProjectByIDRequest struct {
|
||||
type GetProjectByIDResponse struct {
|
||||
Project model.Project `json:"workspace"`
|
||||
}
|
||||
|
||||
type SubscribeProjectEventsRequestRegister struct {
|
||||
Event string `json:"event"`
|
||||
Conditions *SubscribeProjectEventsRequestCondition `json:"conditions"`
|
||||
}
|
||||
|
||||
type SubscribeProjectEventsRequestCondition struct {
|
||||
EnvironmentSlug string `json:"environmentSlug"`
|
||||
SecretPath string `json:"secretPath"`
|
||||
}
|
||||
|
||||
type SubscribeProjectEventsRequest struct {
|
||||
ProjectID string `json:"projectId"`
|
||||
Register []SubscribeProjectEventsRequestRegister `json:"register"`
|
||||
}
|
||||
|
||||
type SubscribeProjectEventsResponse struct{}
|
||||
|
@@ -231,6 +231,7 @@ func (r *InfisicalPushSecretReconciler) Reconcile(ctx context.Context, req ctrl.
|
||||
}
|
||||
|
||||
func (r *InfisicalPushSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
|
||||
// Custom predicate that allows both spec changes and deletions
|
||||
specChangeOrDelete := predicate.Funcs{
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
|
@@ -31,7 +31,6 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
|
||||
secretsv1alpha1 "github.com/Infisical/infisical/k8-operator/api/v1alpha1"
|
||||
"github.com/Infisical/infisical/k8-operator/internal/controllerhelpers"
|
||||
@@ -44,8 +43,6 @@ type InfisicalSecretReconciler struct {
|
||||
client.Client
|
||||
BaseLogger logr.Logger
|
||||
Scheme *runtime.Scheme
|
||||
|
||||
SourceCh chan event.TypedGenericEvent[client.Object]
|
||||
Namespace string
|
||||
IsNamespaceScoped bool
|
||||
}
|
||||
@@ -77,6 +74,7 @@ func (r *InfisicalSecretReconciler) GetLogger(req ctrl.Request) logr.Logger {
|
||||
// For more details, check Reconcile and its Result here:
|
||||
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile
|
||||
func (r *InfisicalSecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||||
|
||||
logger := r.GetLogger(req)
|
||||
|
||||
var infisicalSecretCRD secretsv1alpha1.InfisicalSecret
|
||||
@@ -198,20 +196,6 @@ func (r *InfisicalSecretReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
}, nil
|
||||
}
|
||||
|
||||
if infisicalSecretCRD.Spec.InstantUpdates {
|
||||
if err := handler.OpenInstantUpdatesStream(ctx, logger, &infisicalSecretCRD, infisicalSecretResourceVariablesMap, r.SourceCh); err != nil {
|
||||
requeueTime = time.Second * 10
|
||||
logger.Info(fmt.Sprintf("event stream failed. Will requeue after [requeueTime=%v] [error=%s]", requeueTime, err.Error()))
|
||||
return ctrl.Result{
|
||||
RequeueAfter: requeueTime,
|
||||
}, nil
|
||||
}
|
||||
|
||||
logger.Info("Instant updates are enabled")
|
||||
} else {
|
||||
handler.CloseInstantUpdatesStream(ctx, logger, &infisicalSecretCRD, infisicalSecretResourceVariablesMap)
|
||||
}
|
||||
|
||||
// Sync again after the specified time
|
||||
logger.Info(fmt.Sprintf("Successfully synced %d secrets. Operator will requeue after [%v]", secretsCount, requeueTime))
|
||||
return ctrl.Result{
|
||||
@@ -220,12 +204,7 @@ func (r *InfisicalSecretReconciler) Reconcile(ctx context.Context, req ctrl.Requ
|
||||
}
|
||||
|
||||
func (r *InfisicalSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
r.SourceCh = make(chan event.TypedGenericEvent[client.Object])
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
WatchesRawSource(
|
||||
source.Channel[client.Object](r.SourceCh, &util.EnqueueDelayedEventHandler{Delay: time.Second * 10}),
|
||||
).
|
||||
For(&secretsv1alpha1.InfisicalSecret{}, builder.WithPredicates(predicate.Funcs{
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
if e.ObjectOld.GetGeneration() == e.ObjectNew.GetGeneration() {
|
||||
@@ -251,5 +230,4 @@ func (r *InfisicalSecretReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
},
|
||||
})).
|
||||
Complete(r)
|
||||
|
||||
}
|
||||
|
@@ -7,7 +7,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
|
||||
"github.com/Infisical/infisical/k8-operator/api/v1alpha1"
|
||||
"github.com/Infisical/infisical/k8-operator/internal/api"
|
||||
@@ -101,22 +100,3 @@ func (h *InfisicalSecretHandler) SetInfisicalAutoRedeploymentReady(ctx context.C
|
||||
}
|
||||
reconciler.SetInfisicalAutoRedeploymentReady(ctx, logger, infisicalSecret, numDeployments, errorToConditionOn)
|
||||
}
|
||||
|
||||
func (h *InfisicalSecretHandler) CloseInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables) error {
|
||||
reconciler := &InfisicalSecretReconciler{
|
||||
Client: h.Client,
|
||||
Scheme: h.Scheme,
|
||||
IsNamespaceScoped: h.IsNamespaceScoped,
|
||||
}
|
||||
return reconciler.CloseInstantUpdatesStream(ctx, logger, infisicalSecret, resourceVariablesMap)
|
||||
}
|
||||
|
||||
// Ensures that SSE stream is open, incase if the stream is already opened - this is a noop
|
||||
func (h *InfisicalSecretHandler) OpenInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables, eventCh chan<- event.TypedGenericEvent[client.Object]) error {
|
||||
reconciler := &InfisicalSecretReconciler{
|
||||
Client: h.Client,
|
||||
Scheme: h.Scheme,
|
||||
IsNamespaceScoped: h.IsNamespaceScoped,
|
||||
}
|
||||
return reconciler.OpenInstantUpdatesStream(ctx, logger, infisicalSecret, resourceVariablesMap, eventCh)
|
||||
}
|
||||
|
@@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
tpl "text/template"
|
||||
|
||||
@@ -16,14 +15,11 @@ import (
|
||||
"github.com/Infisical/infisical/k8-operator/internal/model"
|
||||
"github.com/Infisical/infisical/k8-operator/internal/template"
|
||||
"github.com/Infisical/infisical/k8-operator/internal/util"
|
||||
"github.com/Infisical/infisical/k8-operator/internal/util/sse"
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/go-resty/resty/v2"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
|
||||
infisicalSdk "github.com/infisical/go-sdk"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -416,7 +412,6 @@ func (r *InfisicalSecretReconciler) getResourceVariables(infisicalSecret v1alpha
|
||||
InfisicalClient: client,
|
||||
CancelCtx: cancel,
|
||||
AuthDetails: util.AuthenticationDetails{},
|
||||
ServerSentEvents: sse.NewConnectionRegistry(ctx),
|
||||
}
|
||||
|
||||
resourceVariables = resourceVariablesMap[string(infisicalSecret.UID)]
|
||||
@@ -426,6 +421,7 @@ func (r *InfisicalSecretReconciler) getResourceVariables(infisicalSecret v1alpha
|
||||
}
|
||||
|
||||
return resourceVariables
|
||||
|
||||
}
|
||||
|
||||
func (r *InfisicalSecretReconciler) updateResourceVariables(infisicalSecret v1alpha1.InfisicalSecret, resourceVariables util.ResourceVariables, resourceVariablesMap map[string]util.ResourceVariables) {
|
||||
@@ -461,7 +457,6 @@ func (r *InfisicalSecretReconciler) ReconcileInfisicalSecret(ctx context.Context
|
||||
InfisicalClient: infisicalClient,
|
||||
CancelCtx: cancelCtx,
|
||||
AuthDetails: authDetails,
|
||||
ServerSentEvents: sse.NewConnectionRegistry(ctx),
|
||||
}, resourceVariablesMap)
|
||||
}
|
||||
|
||||
@@ -530,94 +525,3 @@ func (r *InfisicalSecretReconciler) ReconcileInfisicalSecret(ctx context.Context
|
||||
|
||||
return secretsCount, nil
|
||||
}
|
||||
|
||||
func (r *InfisicalSecretReconciler) CloseInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables) error {
|
||||
if infisicalSecret == nil {
|
||||
return fmt.Errorf("infisicalSecret is nil")
|
||||
}
|
||||
|
||||
variables := r.getResourceVariables(*infisicalSecret, resourceVariablesMap)
|
||||
|
||||
if !variables.AuthDetails.IsMachineIdentityAuth {
|
||||
return fmt.Errorf("only machine identity is supported for subscriptions")
|
||||
}
|
||||
|
||||
conn := variables.ServerSentEvents
|
||||
|
||||
if _, ok := conn.Get(); ok {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *InfisicalSecretReconciler) OpenInstantUpdatesStream(ctx context.Context, logger logr.Logger, infisicalSecret *v1alpha1.InfisicalSecret, resourceVariablesMap map[string]util.ResourceVariables, eventCh chan<- event.TypedGenericEvent[client.Object]) error {
|
||||
if infisicalSecret == nil {
|
||||
return fmt.Errorf("infisicalSecret is nil")
|
||||
}
|
||||
|
||||
variables := r.getResourceVariables(*infisicalSecret, resourceVariablesMap)
|
||||
|
||||
if !variables.AuthDetails.IsMachineIdentityAuth {
|
||||
return fmt.Errorf("only machine identity is supported for subscriptions")
|
||||
}
|
||||
|
||||
projectSlug := variables.AuthDetails.MachineIdentityScope.ProjectSlug
|
||||
secretsPath := variables.AuthDetails.MachineIdentityScope.SecretsPath
|
||||
envSlug := variables.AuthDetails.MachineIdentityScope.EnvSlug
|
||||
|
||||
infiscalClient := variables.InfisicalClient
|
||||
sseRegistry := variables.ServerSentEvents
|
||||
|
||||
token := infiscalClient.Auth().GetAccessToken()
|
||||
|
||||
project, err := util.GetProjectBySlug(token, projectSlug)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get project [err=%s]", err)
|
||||
}
|
||||
|
||||
if variables.AuthDetails.MachineIdentityScope.Recursive {
|
||||
secretsPath = fmt.Sprint(secretsPath, "**")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("CallSubscribeProjectEvents: unable to marshal body [err=%s]", err)
|
||||
}
|
||||
|
||||
events, errors, err := sseRegistry.Subscribe(func() (*http.Response, error) {
|
||||
httpClient := resty.New()
|
||||
|
||||
req, err := api.CallSubscribeProjectEvents(httpClient, project.ID, secretsPath, envSlug, token)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return req, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to connect sse [err=%s]", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case ev := <-events:
|
||||
logger.Info("Received SSE Event", "event", ev)
|
||||
eventCh <- event.TypedGenericEvent[client.Object]{
|
||||
Object: infisicalSecret,
|
||||
}
|
||||
case err := <-errors:
|
||||
logger.Error(err, "Error occurred")
|
||||
break outer
|
||||
case <-ctx.Done():
|
||||
break outer
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -1,59 +0,0 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
|
||||
// computeMaxJitterDuration returns a random duration between 0 and max.
|
||||
// This is useful for introducing jitter to event processing.
|
||||
func computeMaxJitterDuration(max time.Duration) (time.Duration, time.Duration) {
|
||||
if max <= 0 {
|
||||
return 0, 0
|
||||
}
|
||||
jitter := time.Duration(rand.Int63n(int64(max)))
|
||||
return max, jitter
|
||||
}
|
||||
|
||||
// EnqueueDelayedEventHandler enqueues reconcile requests with a random delay (jitter)
|
||||
// to spread the load and avoid thundering herd issues.
|
||||
type EnqueueDelayedEventHandler struct {
|
||||
Delay time.Duration
|
||||
}
|
||||
|
||||
func (e *EnqueueDelayedEventHandler) Create(_ context.Context, _ event.TypedCreateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (e *EnqueueDelayedEventHandler) Update(_ context.Context, _ event.TypedUpdateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (e *EnqueueDelayedEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (e *EnqueueDelayedEventHandler) Generic(_ context.Context, evt event.TypedGenericEvent[client.Object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
if evt.Object == nil {
|
||||
return
|
||||
}
|
||||
|
||||
req := reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: evt.Object.GetNamespace(),
|
||||
Name: evt.Object.GetName(),
|
||||
},
|
||||
}
|
||||
|
||||
_, delay := computeMaxJitterDuration(e.Delay)
|
||||
|
||||
if delay > 0 {
|
||||
q.AddAfter(req, delay)
|
||||
} else {
|
||||
q.Add(req)
|
||||
}
|
||||
}
|
@@ -3,7 +3,6 @@ package util
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/Infisical/infisical/k8-operator/internal/util/sse"
|
||||
infisicalSdk "github.com/infisical/go-sdk"
|
||||
)
|
||||
|
||||
@@ -11,5 +10,4 @@ type ResourceVariables struct {
|
||||
InfisicalClient infisicalSdk.InfisicalClientInterface
|
||||
CancelCtx context.CancelFunc
|
||||
AuthDetails AuthenticationDetails
|
||||
ServerSentEvents *sse.ConnectionRegistry
|
||||
}
|
||||
|
@@ -1,331 +0,0 @@
|
||||
package sse
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Event represents a Server-Sent Event
|
||||
type Event struct {
|
||||
ID string
|
||||
Event string
|
||||
Data string
|
||||
Retry int
|
||||
}
|
||||
|
||||
// ConnectionMeta holds metadata about an SSE connection
|
||||
type ConnectionMeta struct {
|
||||
EventChan <-chan Event
|
||||
ErrorChan <-chan error
|
||||
lastPingAt atomic.Value // stores time.Time
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// LastPing returns the last ping time
|
||||
func (c *ConnectionMeta) LastPing() time.Time {
|
||||
if t, ok := c.lastPingAt.Load().(time.Time); ok {
|
||||
return t
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
// UpdateLastPing atomically updates the last ping time
|
||||
func (c *ConnectionMeta) UpdateLastPing() {
|
||||
c.lastPingAt.Store(time.Now())
|
||||
}
|
||||
|
||||
// Cancel terminates the connection
|
||||
func (c *ConnectionMeta) Cancel() {
|
||||
if c.cancel != nil {
|
||||
c.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
// ConnectionRegistry manages SSE connections with high performance
|
||||
type ConnectionRegistry struct {
|
||||
mu sync.RWMutex
|
||||
conn *ConnectionMeta
|
||||
|
||||
monitorOnce sync.Once
|
||||
monitorStop chan struct{}
|
||||
|
||||
onPing func() // Callback for ping events
|
||||
}
|
||||
|
||||
// NewConnectionRegistry creates a new high-performance connection registry
|
||||
func NewConnectionRegistry(ctx context.Context) *ConnectionRegistry {
|
||||
r := &ConnectionRegistry{
|
||||
monitorStop: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Configure ping handler
|
||||
r.onPing = func() {
|
||||
r.UpdateLastPing()
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// Subscribe provides SSE events, creating a connection if needed
|
||||
func (r *ConnectionRegistry) Subscribe(request func() (*http.Response, error)) (<-chan Event, <-chan error, error) {
|
||||
// Fast path: check if connection exists
|
||||
if conn := r.getConnection(); conn != nil {
|
||||
return conn.EventChan, conn.ErrorChan, nil
|
||||
}
|
||||
|
||||
// Slow path: create new connection under lock
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// Double-check after acquiring lock
|
||||
if r.conn != nil {
|
||||
return r.conn.EventChan, r.conn.ErrorChan, nil
|
||||
}
|
||||
|
||||
res, err := request()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
conn, err := r.createStream(res)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
r.conn = conn
|
||||
|
||||
// Start monitor once
|
||||
r.monitorOnce.Do(func() {
|
||||
go r.monitorConnections()
|
||||
})
|
||||
|
||||
return conn.EventChan, conn.ErrorChan, nil
|
||||
}
|
||||
|
||||
// Get retrieves the current connection
|
||||
func (r *ConnectionRegistry) Get() (*ConnectionMeta, bool) {
|
||||
conn := r.getConnection()
|
||||
return conn, conn != nil
|
||||
}
|
||||
|
||||
// IsConnected checks if there's an active connection
|
||||
func (r *ConnectionRegistry) IsConnected() bool {
|
||||
return r.getConnection() != nil
|
||||
}
|
||||
|
||||
// UpdateLastPing updates the last ping time for the current connection
|
||||
func (r *ConnectionRegistry) UpdateLastPing() {
|
||||
if conn := r.getConnection(); conn != nil {
|
||||
conn.UpdateLastPing()
|
||||
}
|
||||
}
|
||||
|
||||
// Close gracefully shuts down the registry
|
||||
func (r *ConnectionRegistry) Close() {
|
||||
// Stop monitor first
|
||||
select {
|
||||
case <-r.monitorStop:
|
||||
// Already closed
|
||||
default:
|
||||
close(r.monitorStop)
|
||||
}
|
||||
|
||||
// Close connection
|
||||
r.mu.Lock()
|
||||
if r.conn != nil {
|
||||
r.conn.Cancel()
|
||||
r.conn = nil
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// getConnection returns the current connection without locking
|
||||
func (r *ConnectionRegistry) getConnection() *ConnectionMeta {
|
||||
r.mu.RLock()
|
||||
conn := r.conn
|
||||
r.mu.RUnlock()
|
||||
return conn
|
||||
}
|
||||
|
||||
func (r *ConnectionRegistry) createStream(res *http.Response) (*ConnectionMeta, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
eventChan, errorChan, err := r.stream(ctx, res)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta := &ConnectionMeta{
|
||||
EventChan: eventChan,
|
||||
ErrorChan: errorChan,
|
||||
cancel: cancel,
|
||||
}
|
||||
meta.UpdateLastPing()
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// stream processes SSE data from an HTTP response
|
||||
func (r *ConnectionRegistry) stream(ctx context.Context, res *http.Response) (<-chan Event, <-chan error, error) {
|
||||
eventChan := make(chan Event, 10)
|
||||
errorChan := make(chan error, 1)
|
||||
|
||||
go r.processStream(ctx, res.Body, eventChan, errorChan)
|
||||
|
||||
return eventChan, errorChan, nil
|
||||
}
|
||||
|
||||
// processStream reads and parses SSE events from the response body
|
||||
func (r *ConnectionRegistry) processStream(ctx context.Context, body io.ReadCloser, eventChan chan<- Event, errorChan chan<- error) {
|
||||
defer body.Close()
|
||||
defer close(eventChan)
|
||||
defer close(errorChan)
|
||||
|
||||
scanner := bufio.NewScanner(body)
|
||||
|
||||
var currentEvent Event
|
||||
var dataBuilder strings.Builder
|
||||
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
line := scanner.Text()
|
||||
|
||||
// Empty line indicates end of event
|
||||
if len(line) == 0 {
|
||||
if currentEvent.Data != "" || currentEvent.Event != "" {
|
||||
// Finalize data
|
||||
if dataBuilder.Len() > 0 {
|
||||
currentEvent.Data = dataBuilder.String()
|
||||
dataBuilder.Reset()
|
||||
}
|
||||
|
||||
// Handle ping events
|
||||
if r.isPingEvent(currentEvent) {
|
||||
if r.onPing != nil {
|
||||
r.onPing()
|
||||
}
|
||||
} else {
|
||||
// Send non-ping events
|
||||
select {
|
||||
case eventChan <- currentEvent:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Reset for next event
|
||||
currentEvent = Event{}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse line efficiently
|
||||
r.parseLine(line, ¤tEvent, &dataBuilder)
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
select {
|
||||
case errorChan <- err:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parseLine efficiently parses SSE protocol lines
|
||||
func (r *ConnectionRegistry) parseLine(line string, event *Event, dataBuilder *strings.Builder) {
|
||||
colonIndex := strings.IndexByte(line, ':')
|
||||
if colonIndex == -1 {
|
||||
return // Invalid line format
|
||||
}
|
||||
|
||||
field := line[:colonIndex]
|
||||
value := line[colonIndex+1:]
|
||||
|
||||
// Trim leading space from value (SSE spec)
|
||||
if len(value) > 0 && value[0] == ' ' {
|
||||
value = value[1:]
|
||||
}
|
||||
|
||||
switch field {
|
||||
case "data":
|
||||
if dataBuilder.Len() > 0 {
|
||||
dataBuilder.WriteByte('\n')
|
||||
}
|
||||
dataBuilder.WriteString(value)
|
||||
case "event":
|
||||
event.Event = value
|
||||
case "id":
|
||||
event.ID = value
|
||||
case "retry":
|
||||
// Parse retry value if needed
|
||||
// This could be used to configure reconnection delay
|
||||
case "":
|
||||
// Comment line, ignore
|
||||
}
|
||||
}
|
||||
|
||||
// isPingEvent checks if an event is a ping/keepalive
|
||||
func (r *ConnectionRegistry) isPingEvent(event Event) bool {
|
||||
// Check for common ping patterns
|
||||
if event.Event == "ping" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check for heartbeat data (common pattern is "1" or similar)
|
||||
if event.Event == "" && strings.TrimSpace(event.Data) == "1" {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// monitorConnections checks connection health periodically
|
||||
func (r *ConnectionRegistry) monitorConnections() {
|
||||
const (
|
||||
checkInterval = 30 * time.Second
|
||||
pingTimeout = 2 * time.Minute
|
||||
)
|
||||
|
||||
ticker := time.NewTicker(checkInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.monitorStop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
r.checkConnectionHealth(pingTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkConnectionHealth verifies connection is still alive
|
||||
func (r *ConnectionRegistry) checkConnectionHealth(timeout time.Duration) {
|
||||
conn := r.getConnection()
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if time.Since(conn.LastPing()) > timeout {
|
||||
// Connection is stale, close it
|
||||
r.mu.Lock()
|
||||
if r.conn == conn { // Verify it's still the same connection
|
||||
r.conn.Cancel()
|
||||
r.monitorStop <- struct{}{}
|
||||
r.conn = nil
|
||||
}
|
||||
r.mu.Unlock()
|
||||
}
|
||||
}
|
@@ -9,6 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func GetProjectByID(accessToken string, projectId string) (model.Project, error) {
|
||||
|
||||
httpClient := resty.New()
|
||||
httpClient.
|
||||
SetAuthScheme("Bearer").
|
||||
@@ -24,21 +25,3 @@ func GetProjectByID(accessToken string, projectId string) (model.Project, error)
|
||||
|
||||
return projectDetails.Project, nil
|
||||
}
|
||||
|
||||
func GetProjectBySlug(accessToken string, projectSlug string) (model.Project, error) {
|
||||
httpClient := resty.New()
|
||||
httpClient.
|
||||
SetAuthScheme("Bearer").
|
||||
SetAuthToken(accessToken).
|
||||
SetHeader("Accept", "application/json")
|
||||
|
||||
project, err := api.CallGetProjectByIDv2(httpClient, api.GetProjectByIDRequest{
|
||||
ProjectID: projectSlug,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return model.Project{}, fmt.Errorf("unable to get project by slug. [err=%v]", err)
|
||||
}
|
||||
|
||||
return project, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user