Compare commits

...

21 Commits

Author SHA1 Message Date
sidwebworks
5cdf488e51 fix: license check 2025-08-01 00:39:32 +05:30
sidwebworks
e46bdc8fae fix: remove source prefix in bus event name 2025-08-01 00:32:14 +05:30
sidwebworks
b6b950597b fix: secretPath 2025-07-31 22:00:01 +05:30
sidwebworks
5b2c4ef209 fix: static permissions matching 2025-07-31 16:36:42 +05:30
sidwebworks
ee7999ed9b fix: check length of events 2025-07-31 01:50:01 +05:30
sidwebworks
8f549b90e3 fix: lint errors 2025-07-31 01:14:33 +05:30
sidwebworks
f2bc46266a chore: move event impl to ee 2025-07-31 01:05:53 +05:30
sidwebworks
1746eea71c fix: pr changes 2025-07-31 01:02:52 +05:30
sidwebworks
856e320759 fix: duplicate publisher connection 2025-07-30 22:42:58 +05:30
sidwebworks
6037b4383e fix: revert docker compose.dev 2025-07-30 22:33:44 +05:30
sidwebworks
bd6a5e9249 Merge branch 'main' of github.com:Infisical/infisical into sid/events-system 2025-07-30 22:31:59 +05:30
sidwebworks
3412befe24 feat: frontend change 2025-07-30 13:45:52 +05:30
sidwebworks
2f927ae325 fix: revert license 2025-07-30 04:46:56 +05:30
sidwebworks
fab3bb9d07 fix: PR changes 2025-07-30 04:34:09 +05:30
sidwebworks
6eb974a1a3 fix: connection tracking and auth changes 2025-07-29 19:26:59 +05:30
sidwebworks
4c72238302 fix: mocks 2025-07-29 02:22:29 +05:30
sidwebworks
0a9913c5e4 fix: PR changes 2025-07-29 02:18:16 +05:30
sidwebworks
dc48d8d5d7 fix: impl changes 2025-07-27 00:19:32 +05:30
sidwebworks
fface3f15f fix: undo cors 2025-07-26 05:14:43 +05:30
sidwebworks
bfc153e839 chore: save wip 2025-07-26 05:12:58 +05:30
sidwebworks
337aad1abc chore: save poc 2025-07-25 20:10:52 +05:30
23 changed files with 948 additions and 228 deletions

View File

@@ -12,6 +12,8 @@ import { TCertificateAuthorityCrlServiceFactory } from "@app/ee/services/certifi
import { TCertificateEstServiceFactory } from "@app/ee/services/certificate-est/certificate-est-service";
import { TDynamicSecretServiceFactory } from "@app/ee/services/dynamic-secret/dynamic-secret-types";
import { TDynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-types";
import { TEventBusService } from "@app/ee/services/event/event-bus-service";
import { TServerSentEventsService } from "@app/ee/services/event/event-sse-service";
import { TExternalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service";
import { TGatewayServiceFactory } from "@app/ee/services/gateway/gateway-service";
import { TGithubOrgSyncServiceFactory } from "@app/ee/services/github-org-sync/github-org-sync-service";
@@ -296,6 +298,8 @@ declare module "fastify" {
internalCertificateAuthority: TInternalCertificateAuthorityServiceFactory;
pkiTemplate: TPkiTemplatesServiceFactory;
reminder: TReminderServiceFactory;
bus: TEventBusService;
sse: TServerSentEventsService;
};
// this is exclusive use for middlewares in which we need to inject data
// everywhere else access using service layer

View File

@@ -1,7 +1,8 @@
import { AxiosError, RawAxiosRequestHeaders } from "axios";
import { SecretKeyEncoding } from "@app/db/schemas";
import { getConfig } from "@app/lib/config/env";
import { ProjectType, SecretKeyEncoding } from "@app/db/schemas";
import { TEventBusService } from "@app/ee/services/event/event-bus-service";
import { TopicName, toPublishableEvent } from "@app/ee/services/event/types";
import { request } from "@app/lib/config/request";
import { crypto } from "@app/lib/crypto/cryptography";
import { logger } from "@app/lib/logger";
@@ -21,6 +22,7 @@ type TAuditLogQueueServiceFactoryDep = {
queueService: TQueueServiceFactory;
projectDAL: Pick<TProjectDALFactory, "findById">;
licenseService: Pick<TLicenseServiceFactory, "getPlan">;
eventBusService: TEventBusService;
};
export type TAuditLogQueueServiceFactory = {
@@ -36,133 +38,17 @@ export const auditLogQueueServiceFactory = async ({
queueService,
projectDAL,
licenseService,
auditLogStreamDAL
auditLogStreamDAL,
eventBusService
}: TAuditLogQueueServiceFactoryDep): Promise<TAuditLogQueueServiceFactory> => {
const appCfg = getConfig();
const pushToLog = async (data: TCreateAuditLogDTO) => {
if (appCfg.USE_PG_QUEUE && appCfg.SHOULD_INIT_PG_QUEUE) {
await queueService.queuePg<QueueName.AuditLog>(QueueJobs.AuditLog, data, {
retryLimit: 10,
retryBackoff: true
});
} else {
await queueService.queue<QueueName.AuditLog>(QueueName.AuditLog, QueueJobs.AuditLog, data, {
removeOnFail: {
count: 3
},
removeOnComplete: true
});
}
};
if (appCfg.SHOULD_INIT_PG_QUEUE) {
await queueService.startPg<QueueName.AuditLog>(
QueueJobs.AuditLog,
async ([job]) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
let { orgId } = job.data;
const MS_IN_DAY = 24 * 60 * 60 * 1000;
let project;
if (!orgId) {
// it will never be undefined for both org and project id
// TODO(akhilmhdh): use caching here in dal to avoid db calls
project = await projectDAL.findById(projectId as string);
orgId = project.orgId;
}
const plan = await licenseService.getPlan(orgId);
if (plan.auditLogsRetentionDays === 0) {
// skip inserting if audit log retention is 0 meaning its not supported
return;
}
// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;
const ttl = ttlInDays * MS_IN_DAY;
const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});
const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
crypto
.encryption()
.symmetric()
.decryptWithRootEncryptionKey({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];
const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };
if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
});
try {
const response = await request.post(
url,
{ ...providerSpecificPayload(url), ...auditLog },
{
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
}
);
return response;
} catch (error) {
logger.error(
`Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]`
);
return error;
}
}
)
);
await queueService.queue<QueueName.AuditLog>(QueueName.AuditLog, QueueJobs.AuditLog, data, {
removeOnFail: {
count: 3
},
{
batchSize: 1,
workerCount: 30,
pollingIntervalSeconds: 0.5
}
);
}
removeOnComplete: true
});
};
queueService.start(QueueName.AuditLog, async (job) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
@@ -178,88 +64,97 @@ export const auditLogQueueServiceFactory = async ({
}
const plan = await licenseService.getPlan(orgId);
if (plan.auditLogsRetentionDays === 0) {
// skip inserting if audit log retention is 0 meaning its not supported
return;
// skip inserting if audit log retention is 0 meaning its not supported
if (plan.auditLogsRetentionDays !== 0) {
// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;
const ttl = ttlInDays * MS_IN_DAY;
const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});
const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
crypto
.encryption()
.symmetric()
.decryptWithRootEncryptionKey({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];
const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };
if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
});
try {
const response = await request.post(
url,
{ ...providerSpecificPayload(url), ...auditLog },
{
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
}
);
return response;
} catch (error) {
logger.error(
`Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]`
);
return error;
}
}
)
);
}
// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;
const publishable = toPublishableEvent(event);
const ttl = ttlInDays * MS_IN_DAY;
const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});
const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
crypto
.encryption()
.symmetric()
.decryptWithRootEncryptionKey({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];
const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };
if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
});
try {
const response = await request.post(
url,
{ ...providerSpecificPayload(url), ...auditLog },
{
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
}
);
return response;
} catch (error) {
logger.error(
`Failed to stream audit log [url=${url}] for org [orgId=${orgId}] [error=${(error as AxiosError).message}]`
);
return error;
}
}
)
);
if (publishable) {
await eventBusService.publish(TopicName.CoreServers, {
type: ProjectType.SecretManager,
source: "infiscal",
data: publishable.data
});
}
});
return {

View File

@@ -0,0 +1,83 @@
import Redis from "ioredis";
import { z } from "zod";
import { logger } from "@app/lib/logger";
import { EventSchema, TopicName } from "./types";
export const eventBusFactory = (redis: Redis) => {
const publisher = redis.duplicate();
// Duplicate the publisher to create a subscriber.
// This is necessary because Redis does not allow a single connection to both publish and subscribe.
const subscriber = publisher.duplicate();
const init = async (topics: TopicName[] = Object.values(TopicName)) => {
subscriber.on("error", (e) => {
logger.error(e, "Event Bus subscriber error");
});
publisher.on("error", (e) => {
logger.error(e, "Event Bus publisher error");
});
await subscriber.subscribe(...topics);
};
/**
* Publishes an event to the specified topic.
* @param topic - The topic to publish the event to.
* @param event - The event data to publish.
*/
const publish = async <T extends z.input<typeof EventSchema>>(topic: TopicName, event: T) => {
const json = JSON.stringify(event);
return publisher.publish(topic, json, (err) => {
if (err) {
return logger.error(err, `Error publishing to channel ${topic}`);
}
});
};
/**
* @param fn - The function to call when a message is received.
* It should accept the parsed event data as an argument.
* @template T - The type of the event data, which should match the schema defined in EventSchema.
* @returns A function that can be called to unsubscribe from the event bus.
*/
const subscribe = <T extends z.infer<typeof EventSchema>>(fn: (data: T) => Promise<void> | void) => {
// Not using async await cause redis client's `on` method does not expect async listeners.
const listener = (channel: string, message: string) => {
try {
const parsed = JSON.parse(message) as T;
const thenable = fn(parsed);
// If the function returns a Promise, catch any errors that occur during processing.
if (thenable instanceof Promise) {
thenable.catch((error) => {
logger.error(error, `Error processing message from channel ${channel}`);
});
}
} catch (error) {
logger.error(error, `Error parsing message data from channel ${channel}`);
}
};
subscriber.on("message", listener);
return () => {
subscriber.off("message", listener);
};
};
const close = async () => {
try {
await publisher.quit();
await subscriber.quit();
} catch (error) {
logger.error(error, "Error closing event bus connections");
}
};
return { init, publish, subscribe, close };
};
export type TEventBusService = ReturnType<typeof eventBusFactory>;

View File

@@ -0,0 +1,164 @@
/* eslint-disable no-continue */
import { subject } from "@casl/ability";
import Redis from "ioredis";
import { KeyStorePrefixes } from "@app/keystore/keystore";
import { logger } from "@app/lib/logger";
import { TEventBusService } from "./event-bus-service";
import { createEventStreamClient, EventStreamClient, IEventStreamClientOpts } from "./event-sse-stream";
import { EventData, RegisteredEvent, toBusEventName } from "./types";
const AUTH_REFRESH_INTERVAL = 60 * 1000;
const HEART_BEAT_INTERVAL = 15 * 1000;
export const sseServiceFactory = (bus: TEventBusService, redis: Redis) => {
let heartbeatInterval: NodeJS.Timeout | null = null;
const clients = new Set<EventStreamClient>();
heartbeatInterval = setInterval(() => {
for (const client of clients) {
if (client.stream.closed) continue;
void client.ping();
}
}, HEART_BEAT_INTERVAL);
const refreshInterval = setInterval(() => {
for (const client of clients) {
if (client.stream.closed) continue;
void client.refresh();
}
}, AUTH_REFRESH_INTERVAL);
const removeActiveConnection = async (projectId: string, identityId: string, connectionId: string) => {
const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, identityId);
const key = KeyStorePrefixes.ActiveSSEConnections(projectId, identityId, connectionId);
await Promise.all([redis.lrem(set, 0, connectionId), redis.del(key)]);
};
const getActiveConnectionsCount = async (projectId: string, identityId: string) => {
const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, identityId);
const connections = await redis.lrange(set, 0, -1);
if (connections.length === 0) {
return 0; // No active connections
}
const keys = connections.map((c) => KeyStorePrefixes.ActiveSSEConnections(projectId, identityId, c));
const values = await redis.mget(...keys);
// eslint-disable-next-line no-plusplus
for (let i = 0; i < values.length; i++) {
if (values[i] === null) {
// eslint-disable-next-line no-await-in-loop
await removeActiveConnection(projectId, identityId, connections[i]);
}
}
return redis.llen(set);
};
const onDisconnect = async (client: EventStreamClient) => {
try {
client.close();
clients.delete(client);
await removeActiveConnection(client.auth.projectId, client.auth.actorId, client.id);
} catch (error) {
logger.error(error, "Error during SSE stream disconnection");
}
};
function filterEventsForClient(client: EventStreamClient, event: EventData, registered: RegisteredEvent[]) {
const eventType = toBusEventName(event.data.eventType);
const match = registered.find((r) => r.event === eventType);
if (!match) return;
const item = event.data.payload;
if (Array.isArray(item)) {
if (item.length === 0) return;
const baseSubject = {
eventType,
environment: undefined as string | undefined,
secretPath: undefined as string | undefined
};
const filtered = item.filter((ev) => {
baseSubject.secretPath = ev.secretPath ?? "/";
baseSubject.environment = ev.environment;
return client.matcher.can("subscribe", subject(event.type, baseSubject));
});
if (filtered.length === 0) return;
return client.send({
...event,
data: {
...event.data,
payload: filtered
}
});
}
// For single item
const baseSubject = {
eventType,
secretPath: item.secretPath ?? "/",
environment: item.environment
};
if (client.matcher.can("subscribe", subject(event.type, baseSubject))) {
client.send(event);
}
}
const subscribe = async (
opts: IEventStreamClientOpts & {
onClose?: () => void;
}
) => {
const client = createEventStreamClient(redis, opts);
// Set up event listener on event bus
const unsubscribe = bus.subscribe((event) => {
if (event.type !== opts.type) return;
filterEventsForClient(client, event, opts.registered);
});
client.stream.on("close", () => {
unsubscribe();
void onDisconnect(client); // This will never throw
});
await client.open();
clients.add(client);
return client;
};
const close = () => {
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
}
if (refreshInterval) {
clearInterval(refreshInterval);
}
for (const client of clients) {
client.close();
}
clients.clear();
};
return { subscribe, close, getActiveConnectionsCount };
};
export type TServerSentEventsService = ReturnType<typeof sseServiceFactory>;

View File

@@ -0,0 +1,178 @@
/* eslint-disable no-underscore-dangle */
import { Readable } from "node:stream";
import { MongoAbility, PureAbility } from "@casl/ability";
import { MongoQuery } from "@ucast/mongo2js";
import Redis from "ioredis";
import { nanoid } from "nanoid";
import { ProjectType } from "@app/db/schemas";
import { ProjectPermissionSet } from "@app/ee/services/permission/project-permission";
import { KeyStorePrefixes } from "@app/keystore/keystore";
import { conditionsMatcher } from "@app/lib/casl";
import { logger } from "@app/lib/logger";
import { EventData, RegisteredEvent } from "./types";
export const getServerSentEventsHeaders = () =>
({
"Cache-Control": "no-cache",
"Content-Type": "text/event-stream",
Connection: "keep-alive",
"X-Accel-Buffering": "no"
}) as const;
type TAuthInfo = {
actorId: string;
projectId: string;
permission: MongoAbility<ProjectPermissionSet, MongoQuery>;
};
export interface IEventStreamClientOpts {
type: ProjectType;
registered: RegisteredEvent[];
onAuthRefresh: (info: TAuthInfo) => Promise<void> | void;
getAuthInfo: () => Promise<TAuthInfo> | TAuthInfo;
}
interface EventMessage {
time?: string | number;
type: string;
data?: unknown;
}
function serializeSseEvent(chunk: EventMessage): string {
let payload = "";
if (chunk.time) payload += `id: ${chunk.time}\n`;
if (chunk.type) payload += `event: ${chunk.type}\n`;
if (chunk.data) payload += `data: ${JSON.stringify(chunk)}\n`;
return `${payload}\n`;
}
export type EventStreamClient = {
id: string;
stream: Readable;
open: () => Promise<void>;
send: (data: EventMessage | EventData) => void;
ping: () => Promise<void>;
refresh: () => Promise<void>;
close: () => void;
get auth(): TAuthInfo;
signal: AbortSignal;
abort: () => void;
matcher: PureAbility;
};
export function createEventStreamClient(redis: Redis, options: IEventStreamClientOpts): EventStreamClient {
const rules = options.registered.map((r) => ({
subject: options.type,
action: "subscribe",
conditions: {
eventType: r.event,
secretPath: r.conditions?.secretPath ?? "/",
environment: r.conditions?.environmentSlug
}
}));
const id = `sse-${nanoid()}`;
const control = new AbortController();
const matcher = new PureAbility(rules, { conditionsMatcher });
let auth: TAuthInfo | undefined;
const stream = new Readable({
objectMode: true
});
// We will manually push data to the stream
stream._read = () => {};
const send = (data: EventMessage | EventData) => {
const chunk = serializeSseEvent(data);
if (!stream.push(chunk)) {
logger.debug("Backpressure detected: dropped manual event");
}
};
stream.on("error", (error: Error) => stream.destroy(error));
const open = async () => {
auth = await options.getAuthInfo();
await options.onAuthRefresh(auth);
const { actorId, projectId } = auth;
const set = KeyStorePrefixes.ActiveSSEConnectionsSet(projectId, actorId);
const key = KeyStorePrefixes.ActiveSSEConnections(projectId, actorId, id);
await Promise.all([redis.rpush(set, id), redis.set(key, "1", "EX", 60)]);
};
const ping = async () => {
if (!auth) return; // Avoid race condition if ping is called before open
const { actorId, projectId } = auth;
const key = KeyStorePrefixes.ActiveSSEConnections(projectId, actorId, id);
await redis.set(key, "1", "EX", 60);
stream.push("1");
};
const close = () => {
if (stream.closed) return;
stream.push(null);
stream.destroy();
};
/**
* Refreshes the connection's auth permissions
* Must be called atleast once when connection is opened
*/
const refresh = async () => {
try {
auth = await options.getAuthInfo();
await options.onAuthRefresh(auth);
} catch (error) {
if (error instanceof Error) {
send({
type: "error",
data: {
...error
}
});
return close();
}
stream.emit("error", error);
}
};
const abort = () => {
try {
control.abort();
} catch (error) {
logger.debug(error, "Error aborting SSE stream");
}
};
return {
id,
stream,
open,
send,
ping,
refresh,
close,
signal: control.signal,
abort,
matcher,
get auth() {
if (!auth) {
throw new Error("Auth info not set");
}
return auth;
}
};
}

View File

@@ -0,0 +1,125 @@
import { z } from "zod";
import { ProjectType } from "@app/db/schemas";
import { Event, EventType } from "@app/ee/services/audit-log/audit-log-types";
export enum TopicName {
CoreServers = "infisical::core-servers"
}
export enum BusEventName {
CreateSecret = "secret:create",
UpdateSecret = "secret:update",
DeleteSecret = "secret:delete"
}
type PublisableEventTypes =
| EventType.CREATE_SECRET
| EventType.CREATE_SECRETS
| EventType.DELETE_SECRET
| EventType.DELETE_SECRETS
| EventType.UPDATE_SECRETS
| EventType.UPDATE_SECRET;
export function toBusEventName(input: EventType) {
switch (input) {
case EventType.CREATE_SECRET:
case EventType.CREATE_SECRETS:
return BusEventName.CreateSecret;
case EventType.UPDATE_SECRET:
case EventType.UPDATE_SECRETS:
return BusEventName.UpdateSecret;
case EventType.DELETE_SECRET:
case EventType.DELETE_SECRETS:
return BusEventName.DeleteSecret;
default:
return null;
}
}
const isBulkEvent = (event: Event): event is Extract<Event, { metadata: { secrets: Array<unknown> } }> => {
return event.type.endsWith("-secrets"); // Feels so wrong
};
export const toPublishableEvent = (event: Event) => {
const name = toBusEventName(event.type);
if (!name) return null;
const e = event as Extract<Event, { type: PublisableEventTypes }>;
if (isBulkEvent(e)) {
return {
name,
isBulk: true,
data: {
eventType: e.type,
payload: e.metadata.secrets.map((s) => ({
environment: e.metadata.environment,
secretPath: e.metadata.secretPath,
...s
}))
}
} as const;
}
return {
name,
isBulk: false,
data: {
eventType: e.type,
payload: {
...e.metadata,
environment: e.metadata.environment
}
}
} as const;
};
export const EventName = z.nativeEnum(BusEventName);
const EventSecretPayload = z.object({
secretPath: z.string().optional(),
secretId: z.string(),
secretKey: z.string(),
environment: z.string()
});
export type EventSecret = z.infer<typeof EventSecretPayload>;
export const EventSchema = z.object({
datacontenttype: z.literal("application/json").optional().default("application/json"),
type: z.nativeEnum(ProjectType),
source: z.string(),
time: z
.string()
.optional()
.default(() => new Date().toISOString()),
data: z.discriminatedUnion("eventType", [
z.object({
specversion: z.number().optional().default(1),
eventType: z.enum([EventType.CREATE_SECRET, EventType.UPDATE_SECRET, EventType.DELETE_SECRET]),
payload: EventSecretPayload
}),
z.object({
specversion: z.number().optional().default(1),
eventType: z.enum([EventType.CREATE_SECRETS, EventType.UPDATE_SECRETS, EventType.DELETE_SECRETS]),
payload: EventSecretPayload.array()
})
// Add more event types as needed
])
});
export type EventData = z.infer<typeof EventSchema>;
export const EventRegisterSchema = z.object({
event: EventName,
conditions: z
.object({
secretPath: z.string().optional().default("/"),
environmentSlug: z.string()
})
.optional()
});
export type RegisteredEvent = z.infer<typeof EventRegisterSchema>;

View File

@@ -59,7 +59,8 @@ export const getDefaultOnPremFeatures = (): TFeatureSet => ({
secretScanning: false,
enterpriseSecretSyncs: false,
enterpriseAppConnections: false,
fips: false
fips: false,
eventSubscriptions: false
});
export const setupLicenseRequestWithStore = (

View File

@@ -76,6 +76,7 @@ export type TFeatureSet = {
enterpriseSecretSyncs: false;
enterpriseAppConnections: false;
fips: false;
eventSubscriptions: false;
};
export type TOrgPlansTableDTO = {

View File

@@ -161,7 +161,8 @@ const buildAdminPermissionRules = () => {
ProjectPermissionSecretActions.ReadValue,
ProjectPermissionSecretActions.Create,
ProjectPermissionSecretActions.Edit,
ProjectPermissionSecretActions.Delete
ProjectPermissionSecretActions.Delete,
ProjectPermissionSecretActions.Subscribe
],
ProjectPermissionSub.Secrets
);
@@ -265,7 +266,8 @@ const buildMemberPermissionRules = () => {
ProjectPermissionSecretActions.ReadValue,
ProjectPermissionSecretActions.Edit,
ProjectPermissionSecretActions.Create,
ProjectPermissionSecretActions.Delete
ProjectPermissionSecretActions.Delete,
ProjectPermissionSecretActions.Subscribe
],
ProjectPermissionSub.Secrets
);

View File

@@ -36,7 +36,8 @@ export enum ProjectPermissionSecretActions {
ReadValue = "readValue",
Create = "create",
Edit = "edit",
Delete = "delete"
Delete = "delete",
Subscribe = "subscribe"
}
export enum ProjectPermissionCmekActions {
@@ -204,6 +205,7 @@ export type SecretSubjectFields = {
secretPath: string;
secretName?: string;
secretTags?: string[];
eventType?: string;
};
export type SecretFolderSubjectFields = {
@@ -483,7 +485,17 @@ const SecretConditionV2Schema = z
.object({
[PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN]
})
.partial()
.partial(),
eventType: z.union([
z.string(),
z
.object({
[PermissionConditionOperators.$EQ]: PermissionConditionSchema[PermissionConditionOperators.$EQ],
[PermissionConditionOperators.$NEQ]: PermissionConditionSchema[PermissionConditionOperators.$NEQ],
[PermissionConditionOperators.$IN]: PermissionConditionSchema[PermissionConditionOperators.$IN]
})
.partial()
])
})
.partial();

View File

@@ -46,7 +46,11 @@ export const KeyStorePrefixes = {
IdentityAccessTokenStatusUpdate: (identityAccessTokenId: string) =>
`identity-access-token-status:${identityAccessTokenId}`,
ServiceTokenStatusUpdate: (serviceTokenId: string) => `service-token-status:${serviceTokenId}`,
GatewayIdentityCredential: (identityId: string) => `gateway-credentials:${identityId}`
GatewayIdentityCredential: (identityId: string) => `gateway-credentials:${identityId}`,
ActiveSSEConnectionsSet: (projectId: string, identityId: string) =>
`sse-connections:${projectId}:${identityId}` as const,
ActiveSSEConnections: (projectId: string, identityId: string, connectionId: string) =>
`sse-connections:${projectId}:${identityId}:${connectionId}` as const
};
export const KeyStoreTtls = {

View File

@@ -22,6 +22,7 @@ export type TAuthMode =
orgId: string;
authMethod: AuthMethod;
isMfaVerified?: boolean;
token: AuthModeJwtTokenPayload;
}
| {
authMode: AuthMode.API_KEY;
@@ -30,6 +31,7 @@ export type TAuthMode =
userId: string;
user: TUsers;
orgId: string;
token: string;
}
| {
authMode: AuthMode.SERVICE_TOKEN;
@@ -38,6 +40,7 @@ export type TAuthMode =
serviceTokenId: string;
orgId: string;
authMethod: null;
token: string;
}
| {
authMode: AuthMode.IDENTITY_ACCESS_TOKEN;
@@ -47,6 +50,7 @@ export type TAuthMode =
orgId: string;
authMethod: null;
isInstanceAdmin?: boolean;
token: TIdentityAccessTokenJwtPayload;
}
| {
authMode: AuthMode.SCIM_TOKEN;
@@ -56,7 +60,7 @@ export type TAuthMode =
authMethod: null;
};
const extractAuth = async (req: FastifyRequest, jwtSecret: string) => {
export const extractAuth = async (req: FastifyRequest, jwtSecret: string) => {
const apiKey = req.headers?.["x-api-key"];
if (apiKey) {
return { authMode: AuthMode.API_KEY, token: apiKey, actor: ActorType.USER } as const;
@@ -133,7 +137,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
actor,
orgId: orgId as string,
authMethod: token.authMethod,
isMfaVerified: token.isMfaVerified
isMfaVerified: token.isMfaVerified,
token
};
break;
}
@@ -148,7 +153,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
identityId: identity.identityId,
identityName: identity.name,
authMethod: null,
isInstanceAdmin: serverCfg?.adminIdentityIds?.includes(identity.identityId)
isInstanceAdmin: serverCfg?.adminIdentityIds?.includes(identity.identityId),
token
};
if (token?.identityAuth?.oidc) {
requestContext.set("identityAuthInfo", {
@@ -179,7 +185,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
serviceToken,
serviceTokenId: serviceToken.id,
actor,
authMethod: null
authMethod: null,
token
};
break;
}
@@ -191,7 +198,8 @@ export const injectIdentity = fp(async (server: FastifyZodProvider) => {
actor,
user,
orgId: "API_KEY", // We set the orgId to an arbitrary value, since we can't link an API key to a specific org. We have to deprecate API keys soon!
authMethod: null
authMethod: null,
token: token as string
};
break;
}

View File

@@ -31,6 +31,8 @@ import { buildDynamicSecretProviders } from "@app/ee/services/dynamic-secret/pro
import { dynamicSecretLeaseDALFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-dal";
import { dynamicSecretLeaseQueueServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-queue";
import { dynamicSecretLeaseServiceFactory } from "@app/ee/services/dynamic-secret-lease/dynamic-secret-lease-service";
import { eventBusFactory } from "@app/ee/services/event/event-bus-service";
import { sseServiceFactory } from "@app/ee/services/event/event-sse-service";
import { externalKmsDALFactory } from "@app/ee/services/external-kms/external-kms-dal";
import { externalKmsServiceFactory } from "@app/ee/services/external-kms/external-kms-service";
import { gatewayDALFactory } from "@app/ee/services/gateway/gateway-dal";
@@ -495,6 +497,9 @@ export const registerRoutes = async (
const projectMicrosoftTeamsConfigDAL = projectMicrosoftTeamsConfigDALFactory(db);
const secretScanningV2DAL = secretScanningV2DALFactory(db);
const eventBusService = eventBusFactory(server.redis);
const sseService = sseServiceFactory(eventBusService, server.redis);
const permissionService = permissionServiceFactory({
permissionDAL,
orgRoleDAL,
@@ -552,7 +557,8 @@ export const registerRoutes = async (
queueService,
projectDAL,
licenseService,
auditLogStreamDAL
auditLogStreamDAL,
eventBusService
});
const auditLogService = auditLogServiceFactory({ auditLogDAL, permissionService, auditLogQueue });
@@ -1966,6 +1972,7 @@ export const registerRoutes = async (
await kmsService.startService();
await microsoftTeamsService.start();
await dynamicSecretQueueService.init();
await eventBusService.init();
// inject all services
server.decorate<FastifyZodProvider["services"]>("services", {
@@ -2072,7 +2079,9 @@ export const registerRoutes = async (
githubOrgSync: githubOrgSyncConfigService,
folderCommit: folderCommitService,
secretScanningV2: secretScanningV2Service,
reminder: reminderService
reminder: reminderService,
bus: eventBusService,
sse: sseService
});
const cronJobs: CronJob[] = [];
@@ -2188,5 +2197,7 @@ export const registerRoutes = async (
server.addHook("onClose", async () => {
cronJobs.forEach((job) => job.stop());
await telemetryService.flushAll();
await eventBusService.close();
sseService.close();
});
};

View File

@@ -0,0 +1,118 @@
/* eslint-disable @typescript-eslint/no-floating-promises */
import { subject } from "@casl/ability";
import { pipeline } from "stream/promises";
import { z } from "zod";
import { ActionProjectType, ProjectType } from "@app/db/schemas";
import { getServerSentEventsHeaders } from "@app/ee/services/event/event-sse-stream";
import { EventRegisterSchema } from "@app/ee/services/event/types";
import { ProjectPermissionSecretActions, ProjectPermissionSub } from "@app/ee/services/permission/project-permission";
import { BadRequestError, ForbiddenRequestError, RateLimitError } from "@app/lib/errors";
import { readLimit } from "@app/server/config/rateLimiter";
import { verifyAuth } from "@app/server/plugins/auth/verify-auth";
import { AuthMode } from "@app/services/auth/auth-type";
export const registerEventRouter = async (server: FastifyZodProvider) => {
server.route({
method: "POST",
url: "/subscribe/project-events",
config: {
rateLimit: readLimit
},
schema: {
body: z.object({
projectId: z.string().trim(),
register: z.array(EventRegisterSchema).max(10)
})
},
onRequest: verifyAuth([AuthMode.JWT, AuthMode.IDENTITY_ACCESS_TOKEN]),
handler: async (req, reply) => {
try {
const { sse, permission, identityAccessToken, authToken, license } = req.server.services;
const plan = await license.getPlan(req.auth.orgId);
if (!plan.eventSubscriptions) {
throw new BadRequestError({
message:
"Failed to use event subscriptions due to plan restriction. Upgrade plan to access enterprise event subscriptions."
});
}
const count = await sse.getActiveConnectionsCount(req.body.projectId, req.permission.id);
if (count >= 5) {
throw new RateLimitError({
message: `Too many active connections for project ${req.body.projectId}. Please close some connections before opening a new one.`
});
}
const client = await sse.subscribe({
type: ProjectType.SecretManager,
registered: req.body.register,
async getAuthInfo() {
const ability = await permission.getProjectPermission({
actor: req.auth.actor,
projectId: req.body.projectId,
actionProjectType: ActionProjectType.Any,
actorAuthMethod: req.auth.authMethod,
actorId: req.permission.id,
actorOrgId: req.permission.orgId
});
return { permission: ability.permission, actorId: req.permission.id, projectId: req.body.projectId };
},
async onAuthRefresh(info) {
switch (req.auth.authMode) {
case AuthMode.JWT:
await authToken.fnValidateJwtIdentity(req.auth.token);
break;
case AuthMode.IDENTITY_ACCESS_TOKEN:
await identityAccessToken.fnValidateIdentityAccessToken(req.auth.token, req.realIp);
break;
default:
throw new Error("Unsupported authentication method");
}
req.body.register.forEach((r) => {
const allowed = info.permission.can(
ProjectPermissionSecretActions.Subscribe,
subject(ProjectPermissionSub.Secrets, {
environment: r.conditions?.environmentSlug ?? "",
secretPath: r.conditions?.secretPath ?? "/",
eventType: r.event
})
);
if (!allowed) {
throw new ForbiddenRequestError({
name: "PermissionDenied",
message: `You are not allowed to subscribe on secrets`,
details: {
event: r.event,
environmentSlug: r.conditions?.environmentSlug,
secretPath: r.conditions?.secretPath ?? "/"
}
});
}
});
}
});
// Switches to manual response and enable SSE streaming
reply.hijack();
reply.raw.writeHead(200, getServerSentEventsHeaders()).flushHeaders();
reply.raw.on("close", client.abort);
await pipeline(client.stream, reply.raw, { signal: client.signal });
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
// If the stream is aborted, we don't need to do anything
return;
}
throw error;
}
}
});
};

View File

@@ -13,6 +13,7 @@ import { registerCaRouter } from "./certificate-authority-router";
import { CERTIFICATE_AUTHORITY_REGISTER_ROUTER_MAP } from "./certificate-authority-routers";
import { registerCertRouter } from "./certificate-router";
import { registerCertificateTemplateRouter } from "./certificate-template-router";
import { registerEventRouter } from "./event-router";
import { registerExternalGroupOrgRoleMappingRouter } from "./external-group-org-role-mapping-router";
import { registerIdentityAccessTokenRouter } from "./identity-access-token-router";
import { registerIdentityAliCloudAuthRouter } from "./identity-alicloud-auth-router";
@@ -183,4 +184,6 @@ export const registerV1Routes = async (server: FastifyZodProvider) => {
},
{ prefix: "/reminders" }
);
await server.register(registerEventRouter, { prefix: "/events" });
};

View File

@@ -197,4 +197,4 @@ volumes:
driver: local
ldap_data:
ldap_config:
grafana_storage:
grafana_storage:

View File

@@ -21,7 +21,8 @@ export enum ProjectPermissionSecretActions {
ReadValue = "readValue",
Create = "create",
Edit = "edit",
Delete = "delete"
Delete = "delete",
Subscribe = "subscribe"
}
export enum ProjectPermissionDynamicSecretActions {

View File

@@ -39,6 +39,14 @@ export const renderOperatorSelectItems = (type: string) => {
<SelectItem value={PermissionConditionOperators.$IN}>In</SelectItem>
</>
);
case "eventType":
return (
<>
<SelectItem value={PermissionConditionOperators.$EQ}>Equal</SelectItem>
<SelectItem value={PermissionConditionOperators.$NEQ}>Not Equal</SelectItem>
<SelectItem value={PermissionConditionOperators.$IN}>In</SelectItem>
</>
);
default:
return (
<>

View File

@@ -54,7 +54,8 @@ const SecretPolicyActionSchema = z.object({
[ProjectPermissionSecretActions.ReadValue]: z.boolean().optional(),
[ProjectPermissionSecretActions.Edit]: z.boolean().optional(),
[ProjectPermissionSecretActions.Delete]: z.boolean().optional(),
[ProjectPermissionSecretActions.Create]: z.boolean().optional()
[ProjectPermissionSecretActions.Create]: z.boolean().optional(),
[ProjectPermissionSecretActions.Subscribe]: z.boolean().optional()
});
const ApprovalPolicyActionSchema = z.object({
@@ -588,6 +589,7 @@ export const rolePermission2Form = (permissions: TProjectPermission[] = []) => {
const canEdit = action.includes(ProjectPermissionSecretActions.Edit);
const canDelete = action.includes(ProjectPermissionSecretActions.Delete);
const canCreate = action.includes(ProjectPermissionSecretActions.Create);
const canSubscribe = action.includes(ProjectPermissionSecretActions.Subscribe);
// from above statement we are sure it won't be undefined
formVal[subject]!.push({
@@ -597,6 +599,7 @@ export const rolePermission2Form = (permissions: TProjectPermission[] = []) => {
create: canCreate,
edit: canEdit,
delete: canDelete,
subscribe: canSubscribe,
conditions: conditions ? convertCaslConditionToFormOperator(conditions) : [],
inverted
});
@@ -1111,7 +1114,8 @@ export const PROJECT_PERMISSION_OBJECT: TProjectPermissionObject = {
{ label: "Read Value", value: ProjectPermissionSecretActions.ReadValue },
{ label: "Modify", value: ProjectPermissionSecretActions.Edit },
{ label: "Remove", value: ProjectPermissionSecretActions.Delete },
{ label: "Create", value: ProjectPermissionSecretActions.Create }
{ label: "Create", value: ProjectPermissionSecretActions.Create },
{ label: "Subscribe", value: ProjectPermissionSecretActions.Subscribe }
]
},
[ProjectPermissionSub.SecretFolders]: {

View File

@@ -17,7 +17,8 @@ export const SecretPermissionConditions = ({ position = 0, isDisabled }: Props)
{ value: "environment", label: "Environment Slug" },
{ value: "secretPath", label: "Secret Path" },
{ value: "secretName", label: "Secret Name" },
{ value: "secretTags", label: "Secret Tags" }
{ value: "secretTags", label: "Secret Tags" },
{ value: "eventType", label: "Event Type" }
]}
/>
);

View File

@@ -82,7 +82,11 @@ export const SecretDetectionIgnoreValuesSection = () => {
<div className="flex w-full items-center justify-between">
<p className="text-xl font-semibold">Secret Detection</p>
</div>
<p className="mb-4 mt-2 max-w-2xl text-sm text-gray-400">Define secret values to ignore when scanning designated parameter folders. Add values here to prevent false positives or allow approved sensitive data. These ignored values will not trigger policy violation alerts.</p>
<p className="mb-4 mt-2 max-w-2xl text-sm text-gray-400">
Define secret values to ignore when scanning designated parameter folders. Add values here
to prevent false positives or allow approved sensitive data. These ignored values will not
trigger policy violation alerts.
</p>
<form onSubmit={handleSubmit(handleIgnoreValuesSubmit)} autoComplete="off">
<div className="mb-4">

View File

@@ -1,3 +1,7 @@
upstream api {
server backend:4000;
}
server {
listen 80;
@@ -11,7 +15,7 @@ server {
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
proxy_pass http://backend:4000;
proxy_pass http://api;
proxy_redirect off;
proxy_cookie_path / "/; SameSite=strict";
@@ -24,7 +28,7 @@ server {
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
proxy_pass http://backend:4000;
proxy_pass http://api;
proxy_redirect off;
proxy_cookie_path / "/; HttpOnly; SameSite=strict";
@@ -39,7 +43,7 @@ server {
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
proxy_pass http://backend:4000;
proxy_pass http://api;
proxy_redirect off;
proxy_cookie_path / "/; HttpOnly; SameSite=strict";
@@ -57,7 +61,7 @@ server {
# proxy_set_header X-SSL-Client-Cert $http_x_ssl_client_cert;
# proxy_pass_request_headers on;
proxy_pass http://backend:4000;
proxy_pass http://api;
proxy_redirect off;
# proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";

89
sink/index.html Normal file
View File

@@ -0,0 +1,89 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Document</title>
</head>
<style>
body {
background-color: black;
color: lime;
font-family: "Hack";
}
</style>
<body>
<h1>EventSource Example</h1>
<div>
<p>This page listens for server-sent events from the backend.</p>
<p>Open your browser's console to see the received events.</p>
<ul id="event-list">
<!-- Events will be dynamically added here -->
</ul>
</div>
<script>
const list = document.getElementById("event-list");
async function stream() {
const response = await fetch("http://localhost:8080/api/v1/events/subscribe", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
Connection: "keep-alive",
},
body: JSON.stringify({
projectId: "project-id-123",
events: [
{
type: "secret.created",
},
],
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
console.log("Received line:", line);
if (line.startsWith("data:")) {
const data = line.slice(5).trim();
const listItem = document.createElement("li");
listItem.textContent = `Event(${event.id}): ${data.event}, Data: ${JSON.stringify(data.data)}`;
list.appendChild(listItem);
}
}
}
}
stream().catch((error) => {
console.error("Error in streaming:", error);
});
// source.onmessage = function (event) {
// const data = JSON.parse(event.data);
// console.log("Received data:", data);
// const listItem = document.createElement("li");
// listItem.textContent = `Event(${event.id}): ${data.event}, Data: ${JSON.stringify(data.data)}`;
// list.appendChild(listItem);
// // You can update the UI or perform actions based on the received data
// };
</script>
</body>
</html>