mirror of
https://github.com/Infisical/infisical.git
synced 2025-08-02 08:27:38 +00:00
Compare commits
4 Commits
misc/minor
...
feat/posth
Author | SHA1 | Date | |
---|---|---|---|
|
8044999785 | ||
|
be51e4372d | ||
|
0f04890d8f | ||
|
61274243e2 |
@@ -8,6 +8,9 @@ import { Lock } from "@app/lib/red-lock";
|
||||
export const mockKeyStore = (): TKeyStoreFactory => {
|
||||
const store: Record<string, string | number | Buffer> = {};
|
||||
|
||||
const getRegex = (pattern: string) =>
|
||||
new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
|
||||
|
||||
return {
|
||||
setItem: async (key, value) => {
|
||||
store[key] = value;
|
||||
@@ -23,7 +26,7 @@ export const mockKeyStore = (): TKeyStoreFactory => {
|
||||
return 1;
|
||||
},
|
||||
deleteItems: async ({ pattern, batchSize = 500, delay = 1500, jitter = 200 }) => {
|
||||
const regex = new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
|
||||
const regex = getRegex(pattern);
|
||||
let totalDeleted = 0;
|
||||
const keys = Object.keys(store);
|
||||
|
||||
@@ -53,6 +56,27 @@ export const mockKeyStore = (): TKeyStoreFactory => {
|
||||
incrementBy: async () => {
|
||||
return 1;
|
||||
},
|
||||
getItems: async (keys) => {
|
||||
const values = keys.map((key) => {
|
||||
const value = store[key];
|
||||
if (typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
return null;
|
||||
});
|
||||
return values;
|
||||
},
|
||||
getKeysByPattern: async (pattern) => {
|
||||
const regex = getRegex(pattern);
|
||||
const keys = Object.keys(store);
|
||||
return keys.filter((key) => regex.test(key));
|
||||
},
|
||||
deleteItemsByKeyIn: async (keys) => {
|
||||
for (const key of keys) {
|
||||
delete store[key];
|
||||
}
|
||||
return keys.length;
|
||||
},
|
||||
acquireLock: () => {
|
||||
return Promise.resolve({
|
||||
release: () => {}
|
||||
|
@@ -80,6 +80,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignSshKey,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
principals: req.body.principals,
|
||||
@@ -171,6 +172,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueSshCreds,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
principals: req.body.principals,
|
||||
|
@@ -358,6 +358,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueSshHostUserCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
sshHostId: req.params.sshHostId,
|
||||
hostname: host.hostname,
|
||||
@@ -427,6 +428,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueSshHostHostCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
sshHostId: req.params.sshHostId,
|
||||
|
@@ -72,6 +72,7 @@ type TWaitTillReady = {
|
||||
export type TKeyStoreFactory = {
|
||||
setItem: (key: string, value: string | number | Buffer, prefix?: string) => Promise<"OK">;
|
||||
getItem: (key: string, prefix?: string) => Promise<string | null>;
|
||||
getItems: (keys: string[], prefix?: string) => Promise<(string | null)[]>;
|
||||
setExpiry: (key: string, expiryInSeconds: number) => Promise<number>;
|
||||
setItemWithExpiry: (
|
||||
key: string,
|
||||
@@ -80,6 +81,7 @@ export type TKeyStoreFactory = {
|
||||
prefix?: string
|
||||
) => Promise<"OK">;
|
||||
deleteItem: (key: string) => Promise<number>;
|
||||
deleteItemsByKeyIn: (keys: string[]) => Promise<number>;
|
||||
deleteItems: (arg: TDeleteItems) => Promise<number>;
|
||||
incrementBy: (key: string, value: number) => Promise<number>;
|
||||
acquireLock(
|
||||
@@ -88,6 +90,7 @@ export type TKeyStoreFactory = {
|
||||
settings?: Partial<Settings>
|
||||
): Promise<{ release: () => Promise<ExecutionResult> }>;
|
||||
waitTillReady: ({ key, waitingCb, keyCheckCb, waitIteration, delay, jitter }: TWaitTillReady) => Promise<void>;
|
||||
getKeysByPattern: (pattern: string, limit?: number) => Promise<string[]>;
|
||||
};
|
||||
|
||||
export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFactory => {
|
||||
@@ -99,6 +102,9 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
|
||||
|
||||
const getItem = async (key: string, prefix?: string) => redis.get(prefix ? `${prefix}:${key}` : key);
|
||||
|
||||
const getItems = async (keys: string[], prefix?: string) =>
|
||||
redis.mget(keys.map((key) => (prefix ? `${prefix}:${key}` : key)));
|
||||
|
||||
const setItemWithExpiry = async (
|
||||
key: string,
|
||||
expiryInSeconds: number | string,
|
||||
@@ -108,6 +114,11 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
|
||||
|
||||
const deleteItem = async (key: string) => redis.del(key);
|
||||
|
||||
const deleteItemsByKeyIn = async (keys: string[]) => {
|
||||
if (keys.length === 0) return 0;
|
||||
return redis.del(keys);
|
||||
};
|
||||
|
||||
const deleteItems = async ({ pattern, batchSize = 500, delay = 1500, jitter = 200 }: TDeleteItems) => {
|
||||
let cursor = "0";
|
||||
let totalDeleted = 0;
|
||||
@@ -163,6 +174,24 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
|
||||
}
|
||||
};
|
||||
|
||||
const getKeysByPattern = async (pattern: string, limit?: number) => {
|
||||
let cursor = "0";
|
||||
const allKeys: string[] = [];
|
||||
|
||||
do {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const [nextCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000);
|
||||
cursor = nextCursor;
|
||||
allKeys.push(...keys);
|
||||
|
||||
if (limit && allKeys.length >= limit) {
|
||||
return allKeys.slice(0, limit);
|
||||
}
|
||||
} while (cursor !== "0");
|
||||
|
||||
return allKeys;
|
||||
};
|
||||
|
||||
return {
|
||||
setItem,
|
||||
getItem,
|
||||
@@ -174,6 +203,9 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
|
||||
acquireLock(resources: string[], duration: number, settings?: Partial<Settings>) {
|
||||
return redisLock.acquire(resources, duration, settings);
|
||||
},
|
||||
waitTillReady
|
||||
waitTillReady,
|
||||
getKeysByPattern,
|
||||
deleteItemsByKeyIn,
|
||||
getItems
|
||||
};
|
||||
};
|
||||
|
@@ -8,6 +8,8 @@ import { TKeyStoreFactory } from "./keystore";
|
||||
|
||||
export const inMemoryKeyStore = (): TKeyStoreFactory => {
|
||||
const store: Record<string, string | number | Buffer> = {};
|
||||
const getRegex = (pattern: string) =>
|
||||
new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
|
||||
|
||||
return {
|
||||
setItem: async (key, value) => {
|
||||
@@ -24,7 +26,7 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => {
|
||||
return 1;
|
||||
},
|
||||
deleteItems: async ({ pattern, batchSize = 500, delay = 1500, jitter = 200 }) => {
|
||||
const regex = new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
|
||||
const regex = getRegex(pattern);
|
||||
let totalDeleted = 0;
|
||||
const keys = Object.keys(store);
|
||||
|
||||
@@ -59,6 +61,27 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => {
|
||||
release: () => {}
|
||||
}) as Promise<Lock>;
|
||||
},
|
||||
waitTillReady: async () => {}
|
||||
waitTillReady: async () => {},
|
||||
getKeysByPattern: async (pattern) => {
|
||||
const regex = getRegex(pattern);
|
||||
const keys = Object.keys(store);
|
||||
return keys.filter((key) => regex.test(key));
|
||||
},
|
||||
deleteItemsByKeyIn: async (keys) => {
|
||||
for (const key of keys) {
|
||||
delete store[key];
|
||||
}
|
||||
return keys.length;
|
||||
},
|
||||
getItems: async (keys) => {
|
||||
const values = keys.map((key) => {
|
||||
const value = store[key];
|
||||
if (typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
return null;
|
||||
});
|
||||
return values;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
@@ -62,7 +62,8 @@ export enum QueueName {
|
||||
SecretRotationV2 = "secret-rotation-v2",
|
||||
FolderTreeCheckpoint = "folder-tree-checkpoint",
|
||||
InvalidateCache = "invalidate-cache",
|
||||
SecretScanningV2 = "secret-scanning-v2"
|
||||
SecretScanningV2 = "secret-scanning-v2",
|
||||
TelemetryAggregatedEvents = "telemetry-aggregated-events"
|
||||
}
|
||||
|
||||
export enum QueueJobs {
|
||||
@@ -101,7 +102,8 @@ export enum QueueJobs {
|
||||
SecretScanningV2DiffScan = "secret-scanning-v2-diff-scan",
|
||||
SecretScanningV2SendNotification = "secret-scanning-v2-notification",
|
||||
CaOrderCertificateForSubscriber = "ca-order-certificate-for-subscriber",
|
||||
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal"
|
||||
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal",
|
||||
TelemetryAggregatedEvents = "telemetry-aggregated-events"
|
||||
}
|
||||
|
||||
export type TQueueJobTypes = {
|
||||
@@ -292,6 +294,10 @@ export type TQueueJobTypes = {
|
||||
name: QueueJobs.PkiSubscriberDailyAutoRenewal;
|
||||
payload: undefined;
|
||||
};
|
||||
[QueueName.TelemetryAggregatedEvents]: {
|
||||
name: QueueJobs.TelemetryAggregatedEvents;
|
||||
payload: undefined;
|
||||
};
|
||||
};
|
||||
|
||||
const SECRET_SCANNING_JOBS = [
|
||||
|
@@ -686,7 +686,8 @@ export const registerRoutes = async (
|
||||
const telemetryQueue = telemetryQueueServiceFactory({
|
||||
keyStore,
|
||||
telemetryDAL,
|
||||
queueService
|
||||
queueService,
|
||||
telemetryService
|
||||
});
|
||||
|
||||
const invalidateCacheQueue = invalidateCacheQueueFactory({
|
||||
|
@@ -722,6 +722,7 @@ export const registerAdminRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.InvalidateCache,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
...req.auditLogInfo
|
||||
|
@@ -692,6 +692,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
caId: ca.id,
|
||||
@@ -786,6 +787,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
caId: ca.id,
|
||||
|
@@ -266,6 +266,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
caId: req.body.caId,
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
@@ -442,6 +443,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
caId: req.body.caId,
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
|
@@ -475,6 +475,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secretCountFromEnv,
|
||||
workspaceId: projectId,
|
||||
@@ -979,6 +980,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secretCount,
|
||||
workspaceId: projectId,
|
||||
@@ -1144,6 +1146,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secretCountForEnv,
|
||||
workspaceId: projectId,
|
||||
@@ -1336,6 +1339,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: projectId,
|
||||
|
@@ -85,6 +85,7 @@ export const registerIdentityRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.MachineIdentityCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
orgId: req.body.organizationId,
|
||||
name: identity.name,
|
||||
|
@@ -103,6 +103,7 @@ export const registerIntegrationRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IntegrationCreated,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
...createIntegrationEventProperty,
|
||||
|
@@ -64,6 +64,7 @@ export const registerInviteOrgRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.UserOrgInvitation,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
inviteeEmails: req.body.inviteeEmails,
|
||||
organizationRoleSlug: req.body.organizationRoleSlug,
|
||||
|
@@ -331,6 +331,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
subscriberId: subscriber.id,
|
||||
@@ -399,6 +400,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
subscriberId: subscriber.id,
|
||||
commonName: subscriber.commonName,
|
||||
@@ -471,6 +473,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
subscriberId: subscriber.id,
|
||||
commonName: subscriber.commonName,
|
||||
|
@@ -165,6 +165,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretRequestDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
secretRequestId: req.params.id,
|
||||
organizationId: req.permission.orgId,
|
||||
@@ -256,6 +257,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretRequestCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
secretRequestId: shareRequest.id,
|
||||
organizationId: req.permission.orgId,
|
||||
|
@@ -198,6 +198,7 @@ export const registerProjectRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.ProjectCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
orgId: project.orgId,
|
||||
name: project.name,
|
||||
|
@@ -339,6 +339,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId,
|
||||
@@ -484,6 +485,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
if (getUserAgentType(req.headers["user-agent"]) !== UserAgentType.K8_OPERATOR) {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
@@ -600,6 +602,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -725,6 +728,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -815,6 +819,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -922,6 +927,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.query.workspaceId,
|
||||
@@ -1001,6 +1007,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.query.workspaceId,
|
||||
@@ -1172,6 +1179,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -1361,6 +1369,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -1484,6 +1493,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -1667,6 +1677,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -1793,6 +1804,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -1911,6 +1923,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@@ -2019,6 +2032,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: secrets[0].workspace,
|
||||
@@ -2174,6 +2188,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: secrets[0].workspace,
|
||||
@@ -2272,6 +2287,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: secrets[0].workspace,
|
||||
|
@@ -7,13 +7,18 @@ import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
|
||||
|
||||
import { getServerCfg } from "../super-admin/super-admin-service";
|
||||
import { TTelemetryDALFactory } from "./telemetry-dal";
|
||||
import { TELEMETRY_SECRET_OPERATIONS_KEY, TELEMETRY_SECRET_PROCESSED_KEY } from "./telemetry-service";
|
||||
import {
|
||||
TELEMETRY_SECRET_OPERATIONS_KEY,
|
||||
TELEMETRY_SECRET_PROCESSED_KEY,
|
||||
TTelemetryServiceFactory
|
||||
} from "./telemetry-service";
|
||||
import { PostHogEventTypes } from "./telemetry-types";
|
||||
|
||||
type TTelemetryQueueServiceFactoryDep = {
|
||||
queueService: TQueueServiceFactory;
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "deleteItem">;
|
||||
telemetryDAL: TTelemetryDALFactory;
|
||||
telemetryService: TTelemetryServiceFactory;
|
||||
};
|
||||
|
||||
export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServiceFactory>;
|
||||
@@ -21,7 +26,8 @@ export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServ
|
||||
export const telemetryQueueServiceFactory = ({
|
||||
queueService,
|
||||
keyStore,
|
||||
telemetryDAL
|
||||
telemetryDAL,
|
||||
telemetryService
|
||||
}: TTelemetryQueueServiceFactoryDep) => {
|
||||
const appCfg = getConfig();
|
||||
const postHog =
|
||||
@@ -48,6 +54,10 @@ export const telemetryQueueServiceFactory = ({
|
||||
await keyStore.deleteItem(TELEMETRY_SECRET_OPERATIONS_KEY);
|
||||
});
|
||||
|
||||
queueService.start(QueueName.TelemetryAggregatedEvents, async () => {
|
||||
await telemetryService.processAggregatedEvents();
|
||||
});
|
||||
|
||||
// every day at midnight a telemetry job executes on self-hosted instances
|
||||
// this sends some telemetry information like instance id secrets operated etc
|
||||
const startTelemetryCheck = async () => {
|
||||
@@ -60,11 +70,26 @@ export const telemetryQueueServiceFactory = ({
|
||||
{ pattern: "0 0 * * *", utc: true },
|
||||
QueueName.TelemetryInstanceStats // just a job id
|
||||
);
|
||||
|
||||
// clear previous aggregated events job
|
||||
await queueService.stopRepeatableJob(
|
||||
QueueName.TelemetryAggregatedEvents,
|
||||
QueueJobs.TelemetryAggregatedEvents,
|
||||
{ pattern: "*/5 * * * *", utc: true },
|
||||
QueueName.TelemetryAggregatedEvents // just a job id
|
||||
);
|
||||
|
||||
if (postHog) {
|
||||
await queueService.queue(QueueName.TelemetryInstanceStats, QueueJobs.TelemetryInstanceStats, undefined, {
|
||||
jobId: QueueName.TelemetryInstanceStats,
|
||||
repeat: { pattern: "0 0 * * *", utc: true }
|
||||
});
|
||||
|
||||
// Start aggregated events job (runs every five minutes)
|
||||
await queueService.queue(QueueName.TelemetryAggregatedEvents, QueueJobs.TelemetryAggregatedEvents, undefined, {
|
||||
jobId: QueueName.TelemetryAggregatedEvents,
|
||||
repeat: { pattern: "*/5 * * * *", utc: true }
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -72,6 +97,10 @@ export const telemetryQueueServiceFactory = ({
|
||||
logger.error(err?.failedReason, `${QueueName.TelemetryInstanceStats}: failed`);
|
||||
});
|
||||
|
||||
queueService.listen(QueueName.TelemetryAggregatedEvents, "failed", (err) => {
|
||||
logger.error(err?.failedReason, `${QueueName.TelemetryAggregatedEvents}: failed`);
|
||||
});
|
||||
|
||||
return {
|
||||
startTelemetryCheck
|
||||
};
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import { createHash, randomUUID } from "crypto";
|
||||
import { PostHog } from "posthog-node";
|
||||
|
||||
import { TLicenseServiceFactory } from "@app/ee/services/license/license-service";
|
||||
@@ -12,12 +13,49 @@ import { PostHogEventTypes, TPostHogEvent, TSecretModifiedEvent } from "./teleme
|
||||
export const TELEMETRY_SECRET_PROCESSED_KEY = "telemetry-secret-processed";
|
||||
export const TELEMETRY_SECRET_OPERATIONS_KEY = "telemetry-secret-operations";
|
||||
|
||||
export const POSTHOG_AGGREGATED_EVENTS = [PostHogEventTypes.SecretPulled];
|
||||
const TELEMETRY_AGGREGATED_KEY_EXP = 900; // 15mins
|
||||
|
||||
// Bucket configuration
|
||||
const TELEMETRY_BUCKET_COUNT = 30;
|
||||
const TELEMETRY_BUCKET_NAMES = Array.from(
|
||||
{ length: TELEMETRY_BUCKET_COUNT },
|
||||
(_, i) => `bucket-${i.toString().padStart(2, "0")}`
|
||||
);
|
||||
|
||||
type AggregatedEventData = Record<string, unknown>;
|
||||
type SingleEventData = {
|
||||
distinctId: string;
|
||||
event: string;
|
||||
properties: unknown;
|
||||
organizationId: string;
|
||||
};
|
||||
|
||||
export type TTelemetryServiceFactory = ReturnType<typeof telemetryServiceFactory>;
|
||||
export type TTelemetryServiceFactoryDep = {
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "incrementBy">;
|
||||
keyStore: Pick<
|
||||
TKeyStoreFactory,
|
||||
"incrementBy" | "deleteItemsByKeyIn" | "setItemWithExpiry" | "getKeysByPattern" | "getItems"
|
||||
>;
|
||||
licenseService: Pick<TLicenseServiceFactory, "getInstanceType">;
|
||||
};
|
||||
|
||||
const getBucketForDistinctId = (distinctId: string): string => {
|
||||
// Use SHA-256 hash for consistent distribution
|
||||
const hash = createHash("sha256").update(distinctId).digest("hex");
|
||||
|
||||
// Take first 8 characters and convert to number for better distribution
|
||||
const hashNumber = parseInt(hash.substring(0, 8), 16);
|
||||
const bucketIndex = hashNumber % TELEMETRY_BUCKET_COUNT;
|
||||
|
||||
return TELEMETRY_BUCKET_NAMES[bucketIndex];
|
||||
};
|
||||
|
||||
export const createTelemetryEventKey = (event: string, distinctId: string): string => {
|
||||
const bucketId = getBucketForDistinctId(distinctId);
|
||||
return `telemetry-event-${event}-${bucketId}-${distinctId}-${randomUUID()}`;
|
||||
};
|
||||
|
||||
export const telemetryServiceFactory = ({ keyStore, licenseService }: TTelemetryServiceFactoryDep) => {
|
||||
const appCfg = getConfig();
|
||||
|
||||
@@ -64,11 +102,33 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
||||
const instanceType = licenseService.getInstanceType();
|
||||
// capture posthog only when its cloud or signup event happens in self-hosted
|
||||
if (instanceType === InstanceType.Cloud || event.event === PostHogEventTypes.UserSignedUp) {
|
||||
postHog.capture({
|
||||
event: event.event,
|
||||
distinctId: event.distinctId,
|
||||
properties: event.properties
|
||||
});
|
||||
if (event.organizationId) {
|
||||
try {
|
||||
postHog.groupIdentify({ groupType: "organization", groupKey: event.organizationId });
|
||||
} catch (error) {
|
||||
logger.error(error, "Failed to identify PostHog organization");
|
||||
}
|
||||
}
|
||||
if (POSTHOG_AGGREGATED_EVENTS.includes(event.event)) {
|
||||
const eventKey = createTelemetryEventKey(event.event, event.distinctId);
|
||||
await keyStore.setItemWithExpiry(
|
||||
eventKey,
|
||||
TELEMETRY_AGGREGATED_KEY_EXP,
|
||||
JSON.stringify({
|
||||
distinctId: event.distinctId,
|
||||
event: event.event,
|
||||
properties: event.properties,
|
||||
organizationId: event.organizationId
|
||||
})
|
||||
);
|
||||
} else {
|
||||
postHog.capture({
|
||||
event: event.event,
|
||||
distinctId: event.distinctId,
|
||||
properties: event.properties,
|
||||
...(event.organizationId ? { groups: { organization: event.organizationId } } : {})
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -89,6 +149,160 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
||||
}
|
||||
};
|
||||
|
||||
const aggregateGroupProperties = (events: SingleEventData[]): AggregatedEventData => {
|
||||
const aggregatedData: AggregatedEventData = {};
|
||||
|
||||
// Set the total count
|
||||
aggregatedData.count = events.length;
|
||||
|
||||
events.forEach((event) => {
|
||||
if (!event.properties) return;
|
||||
|
||||
Object.entries(event.properties as Record<string, unknown>).forEach(([key, value]: [string, unknown]) => {
|
||||
if (Array.isArray(value)) {
|
||||
// For arrays, count occurrences of each item
|
||||
const existingCounts =
|
||||
aggregatedData[key] &&
|
||||
typeof aggregatedData[key] === "object" &&
|
||||
aggregatedData[key]?.constructor === Object
|
||||
? (aggregatedData[key] as Record<string, number>)
|
||||
: {};
|
||||
|
||||
value.forEach((item) => {
|
||||
const itemKey = typeof item === "object" ? JSON.stringify(item) : String(item);
|
||||
existingCounts[itemKey] = (existingCounts[itemKey] || 0) + 1;
|
||||
});
|
||||
|
||||
aggregatedData[key] = existingCounts;
|
||||
} else if (typeof value === "object" && value?.constructor === Object) {
|
||||
// For objects, count occurrences of each field value
|
||||
const existingCounts =
|
||||
aggregatedData[key] &&
|
||||
typeof aggregatedData[key] === "object" &&
|
||||
aggregatedData[key]?.constructor === Object
|
||||
? (aggregatedData[key] as Record<string, number>)
|
||||
: {};
|
||||
|
||||
if (value) {
|
||||
Object.values(value).forEach((fieldValue) => {
|
||||
const valueKey = typeof fieldValue === "object" ? JSON.stringify(fieldValue) : String(fieldValue);
|
||||
existingCounts[valueKey] = (existingCounts[valueKey] || 0) + 1;
|
||||
});
|
||||
}
|
||||
aggregatedData[key] = existingCounts;
|
||||
} else if (typeof value === "number") {
|
||||
// For numbers, add to existing sum
|
||||
aggregatedData[key] = ((aggregatedData[key] as number) || 0) + value;
|
||||
} else if (value !== undefined && value !== null) {
|
||||
// For other types (strings, booleans, etc.), count occurrences
|
||||
const stringValue = String(value);
|
||||
const existingValue = aggregatedData[key];
|
||||
|
||||
if (!existingValue) {
|
||||
aggregatedData[key] = { [stringValue]: 1 };
|
||||
} else if (existingValue && typeof existingValue === "object" && existingValue.constructor === Object) {
|
||||
const countObject = existingValue as Record<string, number>;
|
||||
countObject[stringValue] = (countObject[stringValue] || 0) + 1;
|
||||
} else {
|
||||
const oldValue = String(existingValue);
|
||||
aggregatedData[key] = {
|
||||
[oldValue]: 1,
|
||||
[stringValue]: 1
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
return aggregatedData;
|
||||
};
|
||||
|
||||
const processBucketEvents = async (eventType: string, bucketId: string) => {
|
||||
if (!postHog) return 0;
|
||||
|
||||
try {
|
||||
const bucketPattern = `telemetry-event-${eventType}-${bucketId}-*`;
|
||||
const bucketKeys = await keyStore.getKeysByPattern(bucketPattern);
|
||||
|
||||
if (bucketKeys.length === 0) return 0;
|
||||
|
||||
const bucketEvents = await keyStore.getItems(bucketKeys);
|
||||
let bucketEventsParsed: SingleEventData[] = [];
|
||||
|
||||
try {
|
||||
bucketEventsParsed = bucketEvents
|
||||
.filter((event) => event !== null)
|
||||
.map((event) => JSON.parse(event as string) as SingleEventData);
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to parse bucket events for ${eventType} in ${bucketId}`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const eventsGrouped = new Map<string, SingleEventData[]>();
|
||||
|
||||
bucketEventsParsed.forEach((event) => {
|
||||
const key = JSON.stringify({ id: event.distinctId, org: event.organizationId });
|
||||
if (!eventsGrouped.has(key)) {
|
||||
eventsGrouped.set(key, []);
|
||||
}
|
||||
eventsGrouped.get(key)!.push(event);
|
||||
});
|
||||
|
||||
if (eventsGrouped.size === 0) return 0;
|
||||
|
||||
for (const [eventsKey, events] of eventsGrouped) {
|
||||
const key = JSON.parse(eventsKey) as { id: string; org?: string };
|
||||
if (key.org) {
|
||||
try {
|
||||
postHog.groupIdentify({ groupType: "organization", groupKey: key.org });
|
||||
} catch (error) {
|
||||
logger.error(error, "Failed to identify PostHog organization");
|
||||
}
|
||||
}
|
||||
const properties = aggregateGroupProperties(events);
|
||||
|
||||
postHog.capture({
|
||||
event: `${eventType} aggregated`,
|
||||
distinctId: key.id,
|
||||
properties,
|
||||
...(key.org ? { groups: { organization: key.org } } : {})
|
||||
});
|
||||
}
|
||||
|
||||
// Clean up processed data for this bucket
|
||||
await keyStore.deleteItemsByKeyIn(bucketKeys);
|
||||
|
||||
logger.info(`Processed ${bucketEventsParsed.length} events from bucket ${bucketId} for ${eventType}`);
|
||||
return bucketEventsParsed.length;
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to process bucket ${bucketId} for ${eventType}`);
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
const processAggregatedEvents = async () => {
|
||||
if (!postHog) return;
|
||||
|
||||
for (const eventType of POSTHOG_AGGREGATED_EVENTS) {
|
||||
let totalProcessed = 0;
|
||||
|
||||
logger.info(`Starting bucket processing for ${eventType}`);
|
||||
|
||||
// Process each bucket sequentially to control memory usage
|
||||
for (const bucketId of TELEMETRY_BUCKET_NAMES) {
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const processed = await processBucketEvents(eventType, bucketId);
|
||||
totalProcessed += processed;
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to process bucket ${bucketId} for ${eventType}`);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`Completed processing ${totalProcessed} total events for ${eventType}`);
|
||||
}
|
||||
};
|
||||
|
||||
const flushAll = async () => {
|
||||
if (postHog) {
|
||||
await postHog.shutdownAsync();
|
||||
@@ -98,6 +312,8 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
||||
return {
|
||||
sendLoopsEvent,
|
||||
sendPostHogEvents,
|
||||
flushAll
|
||||
processAggregatedEvents,
|
||||
flushAll,
|
||||
getBucketForDistinctId
|
||||
};
|
||||
};
|
||||
|
@@ -1,3 +1,13 @@
|
||||
import {
|
||||
IdentityActor,
|
||||
KmipClientActor,
|
||||
PlatformActor,
|
||||
ScimClientActor,
|
||||
ServiceActor,
|
||||
UnknownUserActor,
|
||||
UserActor
|
||||
} from "@app/ee/services/audit-log/audit-log-types";
|
||||
|
||||
export enum PostHogEventTypes {
|
||||
SecretPush = "secrets pushed",
|
||||
SecretPulled = "secrets pulled",
|
||||
@@ -40,6 +50,14 @@ export type TSecretModifiedEvent = {
|
||||
secretPath: string;
|
||||
channel?: string;
|
||||
userAgent?: string;
|
||||
actor?:
|
||||
| UserActor
|
||||
| IdentityActor
|
||||
| ServiceActor
|
||||
| ScimClientActor
|
||||
| PlatformActor
|
||||
| UnknownUserActor
|
||||
| KmipClientActor;
|
||||
};
|
||||
};
|
||||
|
||||
@@ -214,7 +232,7 @@ export type TInvalidateCacheEvent = {
|
||||
};
|
||||
};
|
||||
|
||||
export type TPostHogEvent = { distinctId: string } & (
|
||||
export type TPostHogEvent = { distinctId: string; organizationId?: string } & (
|
||||
| TSecretModifiedEvent
|
||||
| TAdminInitEvent
|
||||
| TUserSignedUpEvent
|
||||
|
Reference in New Issue
Block a user