Compare commits

...

4 Commits

Author SHA1 Message Date
carlosmonastyrski
8044999785 feat(telemetry): increase even redis key exp to 15 mins 2025-06-27 14:31:54 -03:00
carlosmonastyrski
be51e4372d feat(telemetry): addressed PR suggestions 2025-06-27 14:30:31 -03:00
carlosmonastyrski
0f04890d8f feat(telemetry): addressed PR suggestions 2025-06-26 21:18:07 -03:00
carlosmonastyrski
61274243e2 feat(telemetry): add batch events and groups logic 2025-06-26 20:58:01 -03:00
21 changed files with 404 additions and 17 deletions

View File

@@ -8,6 +8,9 @@ import { Lock } from "@app/lib/red-lock";
export const mockKeyStore = (): TKeyStoreFactory => {
const store: Record<string, string | number | Buffer> = {};
const getRegex = (pattern: string) =>
new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
return {
setItem: async (key, value) => {
store[key] = value;
@@ -23,7 +26,7 @@ export const mockKeyStore = (): TKeyStoreFactory => {
return 1;
},
deleteItems: async ({ pattern, batchSize = 500, delay = 1500, jitter = 200 }) => {
const regex = new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
const regex = getRegex(pattern);
let totalDeleted = 0;
const keys = Object.keys(store);
@@ -53,6 +56,27 @@ export const mockKeyStore = (): TKeyStoreFactory => {
incrementBy: async () => {
return 1;
},
getItems: async (keys) => {
const values = keys.map((key) => {
const value = store[key];
if (typeof value === "string") {
return value;
}
return null;
});
return values;
},
getKeysByPattern: async (pattern) => {
const regex = getRegex(pattern);
const keys = Object.keys(store);
return keys.filter((key) => regex.test(key));
},
deleteItemsByKeyIn: async (keys) => {
for (const key of keys) {
delete store[key];
}
return keys.length;
},
acquireLock: () => {
return Promise.resolve({
release: () => {}

View File

@@ -80,6 +80,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SignSshKey,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
certificateTemplateId: req.body.certificateTemplateId,
principals: req.body.principals,
@@ -171,6 +172,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueSshCreds,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
certificateTemplateId: req.body.certificateTemplateId,
principals: req.body.principals,

View File

@@ -358,6 +358,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueSshHostUserCert,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
sshHostId: req.params.sshHostId,
hostname: host.hostname,
@@ -427,6 +428,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueSshHostHostCert,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
sshHostId: req.params.sshHostId,

View File

@@ -72,6 +72,7 @@ type TWaitTillReady = {
export type TKeyStoreFactory = {
setItem: (key: string, value: string | number | Buffer, prefix?: string) => Promise<"OK">;
getItem: (key: string, prefix?: string) => Promise<string | null>;
getItems: (keys: string[], prefix?: string) => Promise<(string | null)[]>;
setExpiry: (key: string, expiryInSeconds: number) => Promise<number>;
setItemWithExpiry: (
key: string,
@@ -80,6 +81,7 @@ export type TKeyStoreFactory = {
prefix?: string
) => Promise<"OK">;
deleteItem: (key: string) => Promise<number>;
deleteItemsByKeyIn: (keys: string[]) => Promise<number>;
deleteItems: (arg: TDeleteItems) => Promise<number>;
incrementBy: (key: string, value: number) => Promise<number>;
acquireLock(
@@ -88,6 +90,7 @@ export type TKeyStoreFactory = {
settings?: Partial<Settings>
): Promise<{ release: () => Promise<ExecutionResult> }>;
waitTillReady: ({ key, waitingCb, keyCheckCb, waitIteration, delay, jitter }: TWaitTillReady) => Promise<void>;
getKeysByPattern: (pattern: string, limit?: number) => Promise<string[]>;
};
export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFactory => {
@@ -99,6 +102,9 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
const getItem = async (key: string, prefix?: string) => redis.get(prefix ? `${prefix}:${key}` : key);
const getItems = async (keys: string[], prefix?: string) =>
redis.mget(keys.map((key) => (prefix ? `${prefix}:${key}` : key)));
const setItemWithExpiry = async (
key: string,
expiryInSeconds: number | string,
@@ -108,6 +114,11 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
const deleteItem = async (key: string) => redis.del(key);
const deleteItemsByKeyIn = async (keys: string[]) => {
if (keys.length === 0) return 0;
return redis.del(keys);
};
const deleteItems = async ({ pattern, batchSize = 500, delay = 1500, jitter = 200 }: TDeleteItems) => {
let cursor = "0";
let totalDeleted = 0;
@@ -163,6 +174,24 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
}
};
const getKeysByPattern = async (pattern: string, limit?: number) => {
let cursor = "0";
const allKeys: string[] = [];
do {
// eslint-disable-next-line no-await-in-loop
const [nextCursor, keys] = await redis.scan(cursor, "MATCH", pattern, "COUNT", 1000);
cursor = nextCursor;
allKeys.push(...keys);
if (limit && allKeys.length >= limit) {
return allKeys.slice(0, limit);
}
} while (cursor !== "0");
return allKeys;
};
return {
setItem,
getItem,
@@ -174,6 +203,9 @@ export const keyStoreFactory = (redisConfigKeys: TRedisConfigKeys): TKeyStoreFac
acquireLock(resources: string[], duration: number, settings?: Partial<Settings>) {
return redisLock.acquire(resources, duration, settings);
},
waitTillReady
waitTillReady,
getKeysByPattern,
deleteItemsByKeyIn,
getItems
};
};

View File

@@ -8,6 +8,8 @@ import { TKeyStoreFactory } from "./keystore";
export const inMemoryKeyStore = (): TKeyStoreFactory => {
const store: Record<string, string | number | Buffer> = {};
const getRegex = (pattern: string) =>
new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
return {
setItem: async (key, value) => {
@@ -24,7 +26,7 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => {
return 1;
},
deleteItems: async ({ pattern, batchSize = 500, delay = 1500, jitter = 200 }) => {
const regex = new RE2(`^${pattern.replace(/[-[\]/{}()+?.\\^$|]/g, "\\$&").replace(/\*/g, ".*")}$`);
const regex = getRegex(pattern);
let totalDeleted = 0;
const keys = Object.keys(store);
@@ -59,6 +61,27 @@ export const inMemoryKeyStore = (): TKeyStoreFactory => {
release: () => {}
}) as Promise<Lock>;
},
waitTillReady: async () => {}
waitTillReady: async () => {},
getKeysByPattern: async (pattern) => {
const regex = getRegex(pattern);
const keys = Object.keys(store);
return keys.filter((key) => regex.test(key));
},
deleteItemsByKeyIn: async (keys) => {
for (const key of keys) {
delete store[key];
}
return keys.length;
},
getItems: async (keys) => {
const values = keys.map((key) => {
const value = store[key];
if (typeof value === "string") {
return value;
}
return null;
});
return values;
}
};
};

View File

@@ -62,7 +62,8 @@ export enum QueueName {
SecretRotationV2 = "secret-rotation-v2",
FolderTreeCheckpoint = "folder-tree-checkpoint",
InvalidateCache = "invalidate-cache",
SecretScanningV2 = "secret-scanning-v2"
SecretScanningV2 = "secret-scanning-v2",
TelemetryAggregatedEvents = "telemetry-aggregated-events"
}
export enum QueueJobs {
@@ -101,7 +102,8 @@ export enum QueueJobs {
SecretScanningV2DiffScan = "secret-scanning-v2-diff-scan",
SecretScanningV2SendNotification = "secret-scanning-v2-notification",
CaOrderCertificateForSubscriber = "ca-order-certificate-for-subscriber",
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal"
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal",
TelemetryAggregatedEvents = "telemetry-aggregated-events"
}
export type TQueueJobTypes = {
@@ -292,6 +294,10 @@ export type TQueueJobTypes = {
name: QueueJobs.PkiSubscriberDailyAutoRenewal;
payload: undefined;
};
[QueueName.TelemetryAggregatedEvents]: {
name: QueueJobs.TelemetryAggregatedEvents;
payload: undefined;
};
};
const SECRET_SCANNING_JOBS = [

View File

@@ -686,7 +686,8 @@ export const registerRoutes = async (
const telemetryQueue = telemetryQueueServiceFactory({
keyStore,
telemetryDAL,
queueService
queueService,
telemetryService
});
const invalidateCacheQueue = invalidateCacheQueueFactory({

View File

@@ -722,6 +722,7 @@ export const registerAdminRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.InvalidateCache,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
...req.auditLogInfo

View File

@@ -692,6 +692,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueCert,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
caId: ca.id,
@@ -786,6 +787,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SignCert,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
caId: ca.id,

View File

@@ -266,6 +266,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueCert,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
caId: req.body.caId,
certificateTemplateId: req.body.certificateTemplateId,
@@ -442,6 +443,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SignCert,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
caId: req.body.caId,
certificateTemplateId: req.body.certificateTemplateId,

View File

@@ -475,6 +475,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secretCountFromEnv,
workspaceId: projectId,
@@ -979,6 +980,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secretCount,
workspaceId: projectId,
@@ -1144,6 +1146,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secretCountForEnv,
workspaceId: projectId,
@@ -1336,6 +1339,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: projectId,

View File

@@ -85,6 +85,7 @@ export const registerIdentityRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.MachineIdentityCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
orgId: req.body.organizationId,
name: identity.name,

View File

@@ -103,6 +103,7 @@ export const registerIntegrationRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IntegrationCreated,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
...createIntegrationEventProperty,

View File

@@ -64,6 +64,7 @@ export const registerInviteOrgRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.UserOrgInvitation,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
inviteeEmails: req.body.inviteeEmails,
organizationRoleSlug: req.body.organizationRoleSlug,

View File

@@ -331,6 +331,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueCert,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
subscriberId: subscriber.id,
@@ -399,6 +400,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.IssueCert,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
subscriberId: subscriber.id,
commonName: subscriber.commonName,
@@ -471,6 +473,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SignCert,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
subscriberId: subscriber.id,
commonName: subscriber.commonName,

View File

@@ -165,6 +165,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretRequestDeleted,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
secretRequestId: req.params.id,
organizationId: req.permission.orgId,
@@ -256,6 +257,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretRequestCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
secretRequestId: shareRequest.id,
organizationId: req.permission.orgId,

View File

@@ -198,6 +198,7 @@ export const registerProjectRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.ProjectCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
orgId: project.orgId,
name: project.name,

View File

@@ -339,6 +339,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId,
@@ -484,6 +485,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
if (getUserAgentType(req.headers["user-agent"]) !== UserAgentType.K8_OPERATOR) {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
organizationId: req.permission.orgId,
distinctId: getTelemetryDistinctId(req),
properties: {
numberOfSecrets: 1,
@@ -600,6 +602,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.body.workspaceId,
@@ -725,6 +728,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretUpdated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.body.workspaceId,
@@ -815,6 +819,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretDeleted,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.body.workspaceId,
@@ -922,6 +927,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: req.query.workspaceId,
@@ -1001,6 +1007,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretPulled,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.query.workspaceId,
@@ -1172,6 +1179,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.body.workspaceId,
@@ -1361,6 +1369,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretUpdated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.body.workspaceId,
@@ -1484,6 +1493,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretDeleted,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: 1,
workspaceId: req.body.workspaceId,
@@ -1667,6 +1677,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: req.body.workspaceId,
@@ -1793,6 +1804,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretUpdated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: req.body.workspaceId,
@@ -1911,6 +1923,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretDeleted,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: req.body.workspaceId,
@@ -2019,6 +2032,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretCreated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: secrets[0].workspace,
@@ -2174,6 +2188,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretUpdated,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: secrets[0].workspace,
@@ -2272,6 +2287,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
await server.services.telemetry.sendPostHogEvents({
event: PostHogEventTypes.SecretDeleted,
distinctId: getTelemetryDistinctId(req),
organizationId: req.permission.orgId,
properties: {
numberOfSecrets: secrets.length,
workspaceId: secrets[0].workspace,

View File

@@ -7,13 +7,18 @@ import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
import { getServerCfg } from "../super-admin/super-admin-service";
import { TTelemetryDALFactory } from "./telemetry-dal";
import { TELEMETRY_SECRET_OPERATIONS_KEY, TELEMETRY_SECRET_PROCESSED_KEY } from "./telemetry-service";
import {
TELEMETRY_SECRET_OPERATIONS_KEY,
TELEMETRY_SECRET_PROCESSED_KEY,
TTelemetryServiceFactory
} from "./telemetry-service";
import { PostHogEventTypes } from "./telemetry-types";
type TTelemetryQueueServiceFactoryDep = {
queueService: TQueueServiceFactory;
keyStore: Pick<TKeyStoreFactory, "getItem" | "deleteItem">;
telemetryDAL: TTelemetryDALFactory;
telemetryService: TTelemetryServiceFactory;
};
export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServiceFactory>;
@@ -21,7 +26,8 @@ export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServ
export const telemetryQueueServiceFactory = ({
queueService,
keyStore,
telemetryDAL
telemetryDAL,
telemetryService
}: TTelemetryQueueServiceFactoryDep) => {
const appCfg = getConfig();
const postHog =
@@ -48,6 +54,10 @@ export const telemetryQueueServiceFactory = ({
await keyStore.deleteItem(TELEMETRY_SECRET_OPERATIONS_KEY);
});
queueService.start(QueueName.TelemetryAggregatedEvents, async () => {
await telemetryService.processAggregatedEvents();
});
// every day at midnight a telemetry job executes on self-hosted instances
// this sends some telemetry information like instance id secrets operated etc
const startTelemetryCheck = async () => {
@@ -60,11 +70,26 @@ export const telemetryQueueServiceFactory = ({
{ pattern: "0 0 * * *", utc: true },
QueueName.TelemetryInstanceStats // just a job id
);
// clear previous aggregated events job
await queueService.stopRepeatableJob(
QueueName.TelemetryAggregatedEvents,
QueueJobs.TelemetryAggregatedEvents,
{ pattern: "*/5 * * * *", utc: true },
QueueName.TelemetryAggregatedEvents // just a job id
);
if (postHog) {
await queueService.queue(QueueName.TelemetryInstanceStats, QueueJobs.TelemetryInstanceStats, undefined, {
jobId: QueueName.TelemetryInstanceStats,
repeat: { pattern: "0 0 * * *", utc: true }
});
// Start aggregated events job (runs every five minutes)
await queueService.queue(QueueName.TelemetryAggregatedEvents, QueueJobs.TelemetryAggregatedEvents, undefined, {
jobId: QueueName.TelemetryAggregatedEvents,
repeat: { pattern: "*/5 * * * *", utc: true }
});
}
};
@@ -72,6 +97,10 @@ export const telemetryQueueServiceFactory = ({
logger.error(err?.failedReason, `${QueueName.TelemetryInstanceStats}: failed`);
});
queueService.listen(QueueName.TelemetryAggregatedEvents, "failed", (err) => {
logger.error(err?.failedReason, `${QueueName.TelemetryAggregatedEvents}: failed`);
});
return {
startTelemetryCheck
};

View File

@@ -1,3 +1,4 @@
import { createHash, randomUUID } from "crypto";
import { PostHog } from "posthog-node";
import { TLicenseServiceFactory } from "@app/ee/services/license/license-service";
@@ -12,12 +13,49 @@ import { PostHogEventTypes, TPostHogEvent, TSecretModifiedEvent } from "./teleme
export const TELEMETRY_SECRET_PROCESSED_KEY = "telemetry-secret-processed";
export const TELEMETRY_SECRET_OPERATIONS_KEY = "telemetry-secret-operations";
export const POSTHOG_AGGREGATED_EVENTS = [PostHogEventTypes.SecretPulled];
const TELEMETRY_AGGREGATED_KEY_EXP = 900; // 15mins
// Bucket configuration
const TELEMETRY_BUCKET_COUNT = 30;
const TELEMETRY_BUCKET_NAMES = Array.from(
{ length: TELEMETRY_BUCKET_COUNT },
(_, i) => `bucket-${i.toString().padStart(2, "0")}`
);
type AggregatedEventData = Record<string, unknown>;
type SingleEventData = {
distinctId: string;
event: string;
properties: unknown;
organizationId: string;
};
export type TTelemetryServiceFactory = ReturnType<typeof telemetryServiceFactory>;
export type TTelemetryServiceFactoryDep = {
keyStore: Pick<TKeyStoreFactory, "getItem" | "incrementBy">;
keyStore: Pick<
TKeyStoreFactory,
"incrementBy" | "deleteItemsByKeyIn" | "setItemWithExpiry" | "getKeysByPattern" | "getItems"
>;
licenseService: Pick<TLicenseServiceFactory, "getInstanceType">;
};
const getBucketForDistinctId = (distinctId: string): string => {
// Use SHA-256 hash for consistent distribution
const hash = createHash("sha256").update(distinctId).digest("hex");
// Take first 8 characters and convert to number for better distribution
const hashNumber = parseInt(hash.substring(0, 8), 16);
const bucketIndex = hashNumber % TELEMETRY_BUCKET_COUNT;
return TELEMETRY_BUCKET_NAMES[bucketIndex];
};
export const createTelemetryEventKey = (event: string, distinctId: string): string => {
const bucketId = getBucketForDistinctId(distinctId);
return `telemetry-event-${event}-${bucketId}-${distinctId}-${randomUUID()}`;
};
export const telemetryServiceFactory = ({ keyStore, licenseService }: TTelemetryServiceFactoryDep) => {
const appCfg = getConfig();
@@ -64,11 +102,33 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
const instanceType = licenseService.getInstanceType();
// capture posthog only when its cloud or signup event happens in self-hosted
if (instanceType === InstanceType.Cloud || event.event === PostHogEventTypes.UserSignedUp) {
postHog.capture({
event: event.event,
distinctId: event.distinctId,
properties: event.properties
});
if (event.organizationId) {
try {
postHog.groupIdentify({ groupType: "organization", groupKey: event.organizationId });
} catch (error) {
logger.error(error, "Failed to identify PostHog organization");
}
}
if (POSTHOG_AGGREGATED_EVENTS.includes(event.event)) {
const eventKey = createTelemetryEventKey(event.event, event.distinctId);
await keyStore.setItemWithExpiry(
eventKey,
TELEMETRY_AGGREGATED_KEY_EXP,
JSON.stringify({
distinctId: event.distinctId,
event: event.event,
properties: event.properties,
organizationId: event.organizationId
})
);
} else {
postHog.capture({
event: event.event,
distinctId: event.distinctId,
properties: event.properties,
...(event.organizationId ? { groups: { organization: event.organizationId } } : {})
});
}
return;
}
@@ -89,6 +149,160 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
}
};
const aggregateGroupProperties = (events: SingleEventData[]): AggregatedEventData => {
const aggregatedData: AggregatedEventData = {};
// Set the total count
aggregatedData.count = events.length;
events.forEach((event) => {
if (!event.properties) return;
Object.entries(event.properties as Record<string, unknown>).forEach(([key, value]: [string, unknown]) => {
if (Array.isArray(value)) {
// For arrays, count occurrences of each item
const existingCounts =
aggregatedData[key] &&
typeof aggregatedData[key] === "object" &&
aggregatedData[key]?.constructor === Object
? (aggregatedData[key] as Record<string, number>)
: {};
value.forEach((item) => {
const itemKey = typeof item === "object" ? JSON.stringify(item) : String(item);
existingCounts[itemKey] = (existingCounts[itemKey] || 0) + 1;
});
aggregatedData[key] = existingCounts;
} else if (typeof value === "object" && value?.constructor === Object) {
// For objects, count occurrences of each field value
const existingCounts =
aggregatedData[key] &&
typeof aggregatedData[key] === "object" &&
aggregatedData[key]?.constructor === Object
? (aggregatedData[key] as Record<string, number>)
: {};
if (value) {
Object.values(value).forEach((fieldValue) => {
const valueKey = typeof fieldValue === "object" ? JSON.stringify(fieldValue) : String(fieldValue);
existingCounts[valueKey] = (existingCounts[valueKey] || 0) + 1;
});
}
aggregatedData[key] = existingCounts;
} else if (typeof value === "number") {
// For numbers, add to existing sum
aggregatedData[key] = ((aggregatedData[key] as number) || 0) + value;
} else if (value !== undefined && value !== null) {
// For other types (strings, booleans, etc.), count occurrences
const stringValue = String(value);
const existingValue = aggregatedData[key];
if (!existingValue) {
aggregatedData[key] = { [stringValue]: 1 };
} else if (existingValue && typeof existingValue === "object" && existingValue.constructor === Object) {
const countObject = existingValue as Record<string, number>;
countObject[stringValue] = (countObject[stringValue] || 0) + 1;
} else {
const oldValue = String(existingValue);
aggregatedData[key] = {
[oldValue]: 1,
[stringValue]: 1
};
}
}
});
});
return aggregatedData;
};
const processBucketEvents = async (eventType: string, bucketId: string) => {
if (!postHog) return 0;
try {
const bucketPattern = `telemetry-event-${eventType}-${bucketId}-*`;
const bucketKeys = await keyStore.getKeysByPattern(bucketPattern);
if (bucketKeys.length === 0) return 0;
const bucketEvents = await keyStore.getItems(bucketKeys);
let bucketEventsParsed: SingleEventData[] = [];
try {
bucketEventsParsed = bucketEvents
.filter((event) => event !== null)
.map((event) => JSON.parse(event as string) as SingleEventData);
} catch (error) {
logger.error(error, `Failed to parse bucket events for ${eventType} in ${bucketId}`);
return 0;
}
const eventsGrouped = new Map<string, SingleEventData[]>();
bucketEventsParsed.forEach((event) => {
const key = JSON.stringify({ id: event.distinctId, org: event.organizationId });
if (!eventsGrouped.has(key)) {
eventsGrouped.set(key, []);
}
eventsGrouped.get(key)!.push(event);
});
if (eventsGrouped.size === 0) return 0;
for (const [eventsKey, events] of eventsGrouped) {
const key = JSON.parse(eventsKey) as { id: string; org?: string };
if (key.org) {
try {
postHog.groupIdentify({ groupType: "organization", groupKey: key.org });
} catch (error) {
logger.error(error, "Failed to identify PostHog organization");
}
}
const properties = aggregateGroupProperties(events);
postHog.capture({
event: `${eventType} aggregated`,
distinctId: key.id,
properties,
...(key.org ? { groups: { organization: key.org } } : {})
});
}
// Clean up processed data for this bucket
await keyStore.deleteItemsByKeyIn(bucketKeys);
logger.info(`Processed ${bucketEventsParsed.length} events from bucket ${bucketId} for ${eventType}`);
return bucketEventsParsed.length;
} catch (error) {
logger.error(error, `Failed to process bucket ${bucketId} for ${eventType}`);
return 0;
}
};
const processAggregatedEvents = async () => {
if (!postHog) return;
for (const eventType of POSTHOG_AGGREGATED_EVENTS) {
let totalProcessed = 0;
logger.info(`Starting bucket processing for ${eventType}`);
// Process each bucket sequentially to control memory usage
for (const bucketId of TELEMETRY_BUCKET_NAMES) {
try {
// eslint-disable-next-line no-await-in-loop
const processed = await processBucketEvents(eventType, bucketId);
totalProcessed += processed;
} catch (error) {
logger.error(error, `Failed to process bucket ${bucketId} for ${eventType}`);
}
}
logger.info(`Completed processing ${totalProcessed} total events for ${eventType}`);
}
};
const flushAll = async () => {
if (postHog) {
await postHog.shutdownAsync();
@@ -98,6 +312,8 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
return {
sendLoopsEvent,
sendPostHogEvents,
flushAll
processAggregatedEvents,
flushAll,
getBucketForDistinctId
};
};

View File

@@ -1,3 +1,13 @@
import {
IdentityActor,
KmipClientActor,
PlatformActor,
ScimClientActor,
ServiceActor,
UnknownUserActor,
UserActor
} from "@app/ee/services/audit-log/audit-log-types";
export enum PostHogEventTypes {
SecretPush = "secrets pushed",
SecretPulled = "secrets pulled",
@@ -40,6 +50,14 @@ export type TSecretModifiedEvent = {
secretPath: string;
channel?: string;
userAgent?: string;
actor?:
| UserActor
| IdentityActor
| ServiceActor
| ScimClientActor
| PlatformActor
| UnknownUserActor
| KmipClientActor;
};
};
@@ -214,7 +232,7 @@ export type TInvalidateCacheEvent = {
};
};
export type TPostHogEvent = { distinctId: string } & (
export type TPostHogEvent = { distinctId: string; organizationId?: string } & (
| TSecretModifiedEvent
| TAdminInitEvent
| TUserSignedUpEvent