mirror of
https://github.com/Infisical/infisical.git
synced 2025-07-11 12:11:38 +00:00
Compare commits
2 Commits
create-pol
...
feat/post-
Author | SHA1 | Date | |
---|---|---|---|
6b58ac3bc0 | |||
7c641d6f5c |
10
backend/src/@types/knex.d.ts
vendored
10
backend/src/@types/knex.d.ts
vendored
@ -496,6 +496,11 @@ import {
|
||||
TMicrosoftTeamsIntegrationsInsert,
|
||||
TMicrosoftTeamsIntegrationsUpdate
|
||||
} from "@app/db/schemas/microsoft-teams-integrations";
|
||||
import {
|
||||
TPosthogAggregatedEvents,
|
||||
TPosthogAggregatedEventsInsert,
|
||||
TPosthogAggregatedEventsUpdate
|
||||
} from "@app/db/schemas/posthog-aggregated-events";
|
||||
import {
|
||||
TProjectMicrosoftTeamsConfigs,
|
||||
TProjectMicrosoftTeamsConfigsInsert,
|
||||
@ -1203,5 +1208,10 @@ declare module "knex/types/tables" {
|
||||
TSecretScanningConfigsInsert,
|
||||
TSecretScanningConfigsUpdate
|
||||
>;
|
||||
[TableName.PosthogAggregatedEvents]: KnexOriginal.CompositeTableType<
|
||||
TPosthogAggregatedEvents,
|
||||
TPosthogAggregatedEventsInsert,
|
||||
TPosthogAggregatedEventsUpdate
|
||||
>;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,26 @@
|
||||
import { Knex } from "knex";
|
||||
|
||||
import { TableName } from "../schemas";
|
||||
import { createOnUpdateTrigger, dropOnUpdateTrigger } from "../utils";
|
||||
|
||||
export async function up(knex: Knex): Promise<void> {
|
||||
if (!(await knex.schema.hasTable(TableName.PosthogAggregatedEvents))) {
|
||||
await knex.schema.createTable(TableName.PosthogAggregatedEvents, (t) => {
|
||||
t.uuid("id", { primaryKey: true }).defaultTo(knex.fn.uuid());
|
||||
t.string("distinctId").notNullable();
|
||||
t.string("event").notNullable();
|
||||
t.bigInteger("eventCount").defaultTo(0).notNullable();
|
||||
t.jsonb("properties").notNullable();
|
||||
t.string("batchId").notNullable();
|
||||
t.string("organizationId").nullable();
|
||||
t.timestamps(true, true, true);
|
||||
});
|
||||
}
|
||||
|
||||
await createOnUpdateTrigger(knex, TableName.PosthogAggregatedEvents);
|
||||
}
|
||||
|
||||
export async function down(knex: Knex): Promise<void> {
|
||||
await knex.schema.dropTableIfExists(TableName.PosthogAggregatedEvents);
|
||||
await dropOnUpdateTrigger(knex, TableName.PosthogAggregatedEvents);
|
||||
}
|
@ -171,7 +171,9 @@ export enum TableName {
|
||||
SecretScanningResource = "secret_scanning_resources",
|
||||
SecretScanningScan = "secret_scanning_scans",
|
||||
SecretScanningFinding = "secret_scanning_findings",
|
||||
SecretScanningConfig = "secret_scanning_configs"
|
||||
SecretScanningConfig = "secret_scanning_configs",
|
||||
// Telemetry
|
||||
PosthogAggregatedEvents = "posthog_aggregated_events"
|
||||
}
|
||||
|
||||
export type TImmutableDBKeys = "id" | "createdAt" | "updatedAt" | "commitId";
|
||||
|
26
backend/src/db/schemas/posthog-aggregated-events.ts
Normal file
26
backend/src/db/schemas/posthog-aggregated-events.ts
Normal file
@ -0,0 +1,26 @@
|
||||
// Code generated by automation script, DO NOT EDIT.
|
||||
// Automated by pulling database and generating zod schema
|
||||
// To update. Just run npm run generate:schema
|
||||
// Written by akhilmhdh.
|
||||
|
||||
import { z } from "zod";
|
||||
|
||||
import { TImmutableDBKeys } from "./models";
|
||||
|
||||
export const PosthogAggregatedEventsSchema = z.object({
|
||||
id: z.string().uuid(),
|
||||
distinctId: z.string(),
|
||||
event: z.string(),
|
||||
eventCount: z.coerce.number().default(0),
|
||||
properties: z.unknown(),
|
||||
batchId: z.string(),
|
||||
organizationId: z.string().nullable().optional(),
|
||||
createdAt: z.date(),
|
||||
updatedAt: z.date()
|
||||
});
|
||||
|
||||
export type TPosthogAggregatedEvents = z.infer<typeof PosthogAggregatedEventsSchema>;
|
||||
export type TPosthogAggregatedEventsInsert = Omit<z.input<typeof PosthogAggregatedEventsSchema>, TImmutableDBKeys>;
|
||||
export type TPosthogAggregatedEventsUpdate = Partial<
|
||||
Omit<z.input<typeof PosthogAggregatedEventsSchema>, TImmutableDBKeys>
|
||||
>;
|
@ -80,6 +80,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignSshKey,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
principals: req.body.principals,
|
||||
@ -171,6 +172,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueSshCreds,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
principals: req.body.principals,
|
||||
|
@ -358,6 +358,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueSshHostUserCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
sshHostId: req.params.sshHostId,
|
||||
hostname: host.hostname,
|
||||
@ -427,6 +428,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueSshHostHostCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
sshHostId: req.params.sshHostId,
|
||||
|
@ -62,7 +62,8 @@ export enum QueueName {
|
||||
SecretRotationV2 = "secret-rotation-v2",
|
||||
FolderTreeCheckpoint = "folder-tree-checkpoint",
|
||||
InvalidateCache = "invalidate-cache",
|
||||
SecretScanningV2 = "secret-scanning-v2"
|
||||
SecretScanningV2 = "secret-scanning-v2",
|
||||
TelemetryAggregatedEvents = "telemetry-aggregated-events"
|
||||
}
|
||||
|
||||
export enum QueueJobs {
|
||||
@ -101,7 +102,8 @@ export enum QueueJobs {
|
||||
SecretScanningV2DiffScan = "secret-scanning-v2-diff-scan",
|
||||
SecretScanningV2SendNotification = "secret-scanning-v2-notification",
|
||||
CaOrderCertificateForSubscriber = "ca-order-certificate-for-subscriber",
|
||||
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal"
|
||||
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal",
|
||||
TelemetryAggregatedEvents = "telemetry-aggregated-events"
|
||||
}
|
||||
|
||||
export type TQueueJobTypes = {
|
||||
@ -292,6 +294,10 @@ export type TQueueJobTypes = {
|
||||
name: QueueJobs.PkiSubscriberDailyAutoRenewal;
|
||||
payload: undefined;
|
||||
};
|
||||
[QueueName.TelemetryAggregatedEvents]: {
|
||||
name: QueueJobs.TelemetryAggregatedEvents;
|
||||
payload: undefined;
|
||||
};
|
||||
};
|
||||
|
||||
const SECRET_SCANNING_JOBS = [
|
||||
|
@ -278,6 +278,7 @@ import { TSmtpService } from "@app/services/smtp/smtp-service";
|
||||
import { invalidateCacheQueueFactory } from "@app/services/super-admin/invalidate-cache-queue";
|
||||
import { superAdminDALFactory } from "@app/services/super-admin/super-admin-dal";
|
||||
import { getServerCfg, superAdminServiceFactory } from "@app/services/super-admin/super-admin-service";
|
||||
import { posthogAggregatedEventsDALFactory } from "@app/services/telemetry/posthog-aggregated-events-dal";
|
||||
import { telemetryDALFactory } from "@app/services/telemetry/telemetry-dal";
|
||||
import { telemetryQueueServiceFactory } from "@app/services/telemetry/telemetry-queue";
|
||||
import { telemetryServiceFactory } from "@app/services/telemetry/telemetry-service";
|
||||
@ -398,6 +399,7 @@ export const registerRoutes = async (
|
||||
const auditLogStreamDAL = auditLogStreamDALFactory(db);
|
||||
const trustedIpDAL = trustedIpDALFactory(db);
|
||||
const telemetryDAL = telemetryDALFactory(db);
|
||||
const posthogAggregatedEventsDAL = posthogAggregatedEventsDALFactory(db);
|
||||
const appConnectionDAL = appConnectionDALFactory(db);
|
||||
const secretSyncDAL = secretSyncDALFactory(db, folderDAL);
|
||||
|
||||
@ -681,12 +683,14 @@ export const registerRoutes = async (
|
||||
|
||||
const telemetryService = telemetryServiceFactory({
|
||||
keyStore,
|
||||
licenseService
|
||||
licenseService,
|
||||
posthogAggregatedEventsDAL
|
||||
});
|
||||
const telemetryQueue = telemetryQueueServiceFactory({
|
||||
keyStore,
|
||||
telemetryDAL,
|
||||
queueService
|
||||
queueService,
|
||||
telemetryService
|
||||
});
|
||||
|
||||
const invalidateCacheQueue = invalidateCacheQueueFactory({
|
||||
|
@ -722,6 +722,7 @@ export const registerAdminRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.InvalidateCache,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
...req.auditLogInfo
|
||||
|
@ -692,6 +692,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
caId: ca.id,
|
||||
@ -786,6 +787,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
caId: ca.id,
|
||||
|
@ -266,6 +266,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
caId: req.body.caId,
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
@ -442,6 +443,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
caId: req.body.caId,
|
||||
certificateTemplateId: req.body.certificateTemplateId,
|
||||
|
@ -475,6 +475,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secretCountFromEnv,
|
||||
workspaceId: projectId,
|
||||
@ -979,6 +980,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secretCount,
|
||||
workspaceId: projectId,
|
||||
@ -1144,6 +1146,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secretCountForEnv,
|
||||
workspaceId: projectId,
|
||||
@ -1336,6 +1339,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: projectId,
|
||||
|
@ -85,6 +85,7 @@ export const registerIdentityRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.MachineIdentityCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
orgId: req.body.organizationId,
|
||||
name: identity.name,
|
||||
|
@ -103,6 +103,7 @@ export const registerIntegrationRouter = async (server: FastifyZodProvider) => {
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IntegrationCreated,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
...createIntegrationEventProperty,
|
||||
|
@ -64,6 +64,7 @@ export const registerInviteOrgRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.UserOrgInvitation,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
inviteeEmails: req.body.inviteeEmails,
|
||||
organizationRoleSlug: req.body.organizationRoleSlug,
|
||||
|
@ -331,6 +331,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
||||
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
subscriberId: subscriber.id,
|
||||
@ -399,6 +400,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.IssueCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
subscriberId: subscriber.id,
|
||||
commonName: subscriber.commonName,
|
||||
@ -471,6 +473,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SignCert,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
subscriberId: subscriber.id,
|
||||
commonName: subscriber.commonName,
|
||||
|
@ -165,6 +165,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretRequestDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
secretRequestId: req.params.id,
|
||||
organizationId: req.permission.orgId,
|
||||
@ -256,6 +257,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretRequestCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
secretRequestId: shareRequest.id,
|
||||
organizationId: req.permission.orgId,
|
||||
|
@ -198,6 +198,7 @@ export const registerProjectRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.ProjectCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
orgId: project.orgId,
|
||||
name: project.name,
|
||||
|
@ -339,6 +339,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId,
|
||||
@ -484,6 +485,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
if (getUserAgentType(req.headers["user-agent"]) !== UserAgentType.K8_OPERATOR) {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
organizationId: req.permission.orgId,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
@ -600,6 +602,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -725,6 +728,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -815,6 +819,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -922,6 +927,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.query.workspaceId,
|
||||
@ -1001,6 +1007,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretPulled,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.query.workspaceId,
|
||||
@ -1172,6 +1179,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -1361,6 +1369,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -1484,6 +1493,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: 1,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -1667,6 +1677,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -1793,6 +1804,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -1911,6 +1923,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: req.body.workspaceId,
|
||||
@ -2019,6 +2032,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretCreated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: secrets[0].workspace,
|
||||
@ -2174,6 +2188,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretUpdated,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: secrets[0].workspace,
|
||||
@ -2272,6 +2287,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
await server.services.telemetry.sendPostHogEvents({
|
||||
event: PostHogEventTypes.SecretDeleted,
|
||||
distinctId: getTelemetryDistinctId(req),
|
||||
organizationId: req.permission.orgId,
|
||||
properties: {
|
||||
numberOfSecrets: secrets.length,
|
||||
workspaceId: secrets[0].workspace,
|
||||
|
@ -0,0 +1,53 @@
|
||||
import { Knex } from "knex";
|
||||
|
||||
import { TDbClient } from "@app/db";
|
||||
import { TableName } from "@app/db/schemas";
|
||||
import { TPosthogAggregatedEvents } from "@app/db/schemas/posthog-aggregated-events";
|
||||
import { DatabaseError } from "@app/lib/errors";
|
||||
import { buildFindFilter, ormify, selectAllTableCols } from "@app/lib/knex";
|
||||
|
||||
export type TPosthogAggregatedEventsDALFactory = ReturnType<typeof posthogAggregatedEventsDALFactory>;
|
||||
|
||||
export const posthogAggregatedEventsDALFactory = (db: TDbClient) => {
|
||||
const posthogAggregatedEventsOrm = ormify(db, TableName.PosthogAggregatedEvents);
|
||||
|
||||
const getAggregatedEvent = async (
|
||||
batchId: string,
|
||||
distinctId: string,
|
||||
event: string,
|
||||
tx?: Knex
|
||||
): Promise<TPosthogAggregatedEvents | undefined> => {
|
||||
try {
|
||||
const doc = await (tx || db.replicaNode())(TableName.PosthogAggregatedEvents)
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
.where(buildFindFilter({ batchId, distinctId, event }, TableName.PosthogAggregatedEvents))
|
||||
.select(selectAllTableCols(TableName.PosthogAggregatedEvents))
|
||||
.first();
|
||||
return doc;
|
||||
} catch (error) {
|
||||
throw new DatabaseError({ error, name: "Get last hour aggregated events" });
|
||||
}
|
||||
};
|
||||
|
||||
const getAllAggregatedEvent = async (
|
||||
batchId: string,
|
||||
event: string,
|
||||
tx?: Knex
|
||||
): Promise<TPosthogAggregatedEvents[]> => {
|
||||
try {
|
||||
const docs = await (tx || db.replicaNode())(TableName.PosthogAggregatedEvents)
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
.where(buildFindFilter({ batchId, event }, TableName.PosthogAggregatedEvents))
|
||||
.select(selectAllTableCols(TableName.PosthogAggregatedEvents));
|
||||
return docs;
|
||||
} catch (error) {
|
||||
throw new DatabaseError({ error, name: "Get last hour aggregated events" });
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
...posthogAggregatedEventsOrm,
|
||||
getAggregatedEvent,
|
||||
getAllAggregatedEvent
|
||||
};
|
||||
};
|
@ -7,13 +7,18 @@ import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
|
||||
|
||||
import { getServerCfg } from "../super-admin/super-admin-service";
|
||||
import { TTelemetryDALFactory } from "./telemetry-dal";
|
||||
import { TELEMETRY_SECRET_OPERATIONS_KEY, TELEMETRY_SECRET_PROCESSED_KEY } from "./telemetry-service";
|
||||
import {
|
||||
TELEMETRY_SECRET_OPERATIONS_KEY,
|
||||
TELEMETRY_SECRET_PROCESSED_KEY,
|
||||
TTelemetryServiceFactory
|
||||
} from "./telemetry-service";
|
||||
import { PostHogEventTypes } from "./telemetry-types";
|
||||
|
||||
type TTelemetryQueueServiceFactoryDep = {
|
||||
queueService: TQueueServiceFactory;
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "deleteItem">;
|
||||
telemetryDAL: TTelemetryDALFactory;
|
||||
telemetryService: TTelemetryServiceFactory;
|
||||
};
|
||||
|
||||
export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServiceFactory>;
|
||||
@ -21,7 +26,8 @@ export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServ
|
||||
export const telemetryQueueServiceFactory = ({
|
||||
queueService,
|
||||
keyStore,
|
||||
telemetryDAL
|
||||
telemetryDAL,
|
||||
telemetryService
|
||||
}: TTelemetryQueueServiceFactoryDep) => {
|
||||
const appCfg = getConfig();
|
||||
const postHog =
|
||||
@ -48,6 +54,14 @@ export const telemetryQueueServiceFactory = ({
|
||||
await keyStore.deleteItem(TELEMETRY_SECRET_OPERATIONS_KEY);
|
||||
});
|
||||
|
||||
queueService.start(QueueName.TelemetryAggregatedEvents, async () => {
|
||||
try {
|
||||
await telemetryService.processAggregatedEvents();
|
||||
} catch (error) {
|
||||
logger.error(error, "Failed to process aggregated telemetry events");
|
||||
}
|
||||
});
|
||||
|
||||
// every day at midnight a telemetry job executes on self-hosted instances
|
||||
// this sends some telemetry information like instance id secrets operated etc
|
||||
const startTelemetryCheck = async () => {
|
||||
@ -60,11 +74,26 @@ export const telemetryQueueServiceFactory = ({
|
||||
{ pattern: "0 0 * * *", utc: true },
|
||||
QueueName.TelemetryInstanceStats // just a job id
|
||||
);
|
||||
|
||||
// clear previous hourly job
|
||||
await queueService.stopRepeatableJob(
|
||||
QueueName.TelemetryAggregatedEvents,
|
||||
QueueJobs.TelemetryAggregatedEvents,
|
||||
{ pattern: "0 * * * *", utc: true },
|
||||
QueueName.TelemetryAggregatedEvents // just a job id
|
||||
);
|
||||
|
||||
if (postHog) {
|
||||
await queueService.queue(QueueName.TelemetryInstanceStats, QueueJobs.TelemetryInstanceStats, undefined, {
|
||||
jobId: QueueName.TelemetryInstanceStats,
|
||||
repeat: { pattern: "0 0 * * *", utc: true }
|
||||
});
|
||||
|
||||
// Start hourly aggregated events job (runs every hour at minute 0)
|
||||
await queueService.queue(QueueName.TelemetryAggregatedEvents, QueueJobs.TelemetryAggregatedEvents, undefined, {
|
||||
jobId: QueueName.TelemetryAggregatedEvents,
|
||||
repeat: { pattern: "0 * * * *", utc: true }
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@ -72,6 +101,10 @@ export const telemetryQueueServiceFactory = ({
|
||||
logger.error(err?.failedReason, `${QueueName.TelemetryInstanceStats}: failed`);
|
||||
});
|
||||
|
||||
queueService.listen(QueueName.TelemetryAggregatedEvents, "failed", (err) => {
|
||||
logger.error(err?.failedReason, `${QueueName.TelemetryAggregatedEvents}: failed`);
|
||||
});
|
||||
|
||||
return {
|
||||
startTelemetryCheck
|
||||
};
|
||||
|
@ -7,18 +7,41 @@ import { getConfig } from "@app/lib/config/env";
|
||||
import { request } from "@app/lib/config/request";
|
||||
import { logger } from "@app/lib/logger";
|
||||
|
||||
import { TPosthogAggregatedEventsDALFactory } from "./posthog-aggregated-events-dal";
|
||||
import { PostHogEventTypes, TPostHogEvent, TSecretModifiedEvent } from "./telemetry-types";
|
||||
|
||||
export const TELEMETRY_SECRET_PROCESSED_KEY = "telemetry-secret-processed";
|
||||
export const TELEMETRY_SECRET_OPERATIONS_KEY = "telemetry-secret-operations";
|
||||
|
||||
export const HOURLY_AGGREGATED_EVENTS = [PostHogEventTypes.SecretPulled];
|
||||
|
||||
interface AggregatedEventData {
|
||||
[key: string]: string | number | boolean | undefined | Record<string, number>;
|
||||
}
|
||||
|
||||
export type TTelemetryServiceFactory = ReturnType<typeof telemetryServiceFactory>;
|
||||
export type TTelemetryServiceFactoryDep = {
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "incrementBy">;
|
||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "incrementBy" | "setItem" | "deleteItem" | "setItemWithExpiry">;
|
||||
licenseService: Pick<TLicenseServiceFactory, "getInstanceType">;
|
||||
posthogAggregatedEventsDAL: TPosthogAggregatedEventsDALFactory;
|
||||
};
|
||||
|
||||
export const telemetryServiceFactory = ({ keyStore, licenseService }: TTelemetryServiceFactoryDep) => {
|
||||
const getCurrentHourKey = () => {
|
||||
const now = new Date();
|
||||
return `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, "0")}-${String(now.getDate()).padStart(2, "0")}-${String(now.getHours()).padStart(2, "0")}`;
|
||||
};
|
||||
|
||||
const getPreviousHourKey = () => {
|
||||
const now = new Date();
|
||||
now.setHours(now.getHours());
|
||||
return `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, "0")}-${String(now.getDate()).padStart(2, "0")}-${String(now.getHours()).padStart(2, "0")}`;
|
||||
};
|
||||
|
||||
export const telemetryServiceFactory = ({
|
||||
keyStore,
|
||||
licenseService,
|
||||
posthogAggregatedEventsDAL
|
||||
}: TTelemetryServiceFactoryDep) => {
|
||||
const appCfg = getConfig();
|
||||
|
||||
if (appCfg.isProductionMode && !appCfg.TELEMETRY_ENABLED) {
|
||||
@ -59,16 +82,117 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
||||
}
|
||||
};
|
||||
|
||||
const aggregateEventData = async (event: TPostHogEvent, organizationId?: string) => {
|
||||
const currentHour = getCurrentHourKey();
|
||||
|
||||
try {
|
||||
const lastHourAggregatedEvent = await posthogAggregatedEventsDAL.getAggregatedEvent(
|
||||
currentHour,
|
||||
event.distinctId,
|
||||
event.event
|
||||
);
|
||||
const existingData = lastHourAggregatedEvent?.properties;
|
||||
let hourlyData: AggregatedEventData = {};
|
||||
|
||||
if (existingData) {
|
||||
hourlyData = existingData as AggregatedEventData;
|
||||
}
|
||||
|
||||
hourlyData.count = Number(lastHourAggregatedEvent?.eventCount || 0) + 1;
|
||||
|
||||
if (event.properties) {
|
||||
Object.entries(event.properties).forEach(([key, value]: [string, unknown]) => {
|
||||
if (Array.isArray(value)) {
|
||||
// For arrays, count occurrences of each item
|
||||
const existingCounts =
|
||||
hourlyData[key] && typeof hourlyData[key] === "object" && hourlyData[key]?.constructor === Object
|
||||
? (hourlyData[key] as Record<string, number>)
|
||||
: {};
|
||||
|
||||
value.forEach((item) => {
|
||||
const itemKey = typeof item === "object" ? JSON.stringify(item) : String(item);
|
||||
existingCounts[itemKey] = (existingCounts[itemKey] || 0) + 1;
|
||||
});
|
||||
|
||||
hourlyData[key] = existingCounts;
|
||||
} else if (typeof value === "object" && value?.constructor === Object) {
|
||||
// For objects, count occurrences of each field value
|
||||
const existingCounts =
|
||||
hourlyData[key] && typeof hourlyData[key] === "object" && hourlyData[key]?.constructor === Object
|
||||
? (hourlyData[key] as Record<string, number>)
|
||||
: {};
|
||||
|
||||
if (value) {
|
||||
Object.values(value).forEach((fieldValue) => {
|
||||
const valueKey = typeof fieldValue === "object" ? JSON.stringify(fieldValue) : String(fieldValue);
|
||||
existingCounts[valueKey] = (existingCounts[valueKey] || 0) + 1;
|
||||
});
|
||||
}
|
||||
hourlyData[key] = existingCounts;
|
||||
} else if (typeof value === "number") {
|
||||
// For numbers, add to existing sum
|
||||
hourlyData[key] = ((hourlyData[key] as number) || 0) + value;
|
||||
} else if (value !== undefined && value !== null) {
|
||||
// For other types (strings, booleans, etc.), count occurrences
|
||||
const stringValue = String(value);
|
||||
const existingValue = hourlyData[key];
|
||||
|
||||
if (!existingValue) {
|
||||
hourlyData[key] = { [stringValue]: 1 };
|
||||
} else if (existingValue && typeof existingValue === "object" && existingValue.constructor === Object) {
|
||||
const countObject = existingValue;
|
||||
countObject[stringValue] = (countObject[stringValue] || 0) + 1;
|
||||
} else {
|
||||
const oldValue = String(existingValue);
|
||||
hourlyData[key] = {
|
||||
[oldValue]: 1,
|
||||
[stringValue]: 1
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (lastHourAggregatedEvent) {
|
||||
await posthogAggregatedEventsDAL.updateById(lastHourAggregatedEvent.id, {
|
||||
properties: hourlyData,
|
||||
eventCount: Number(lastHourAggregatedEvent.eventCount) + 1,
|
||||
organizationId: lastHourAggregatedEvent?.organizationId || organizationId
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await posthogAggregatedEventsDAL.create({
|
||||
batchId: currentHour,
|
||||
distinctId: event.distinctId,
|
||||
event: event.event,
|
||||
properties: hourlyData,
|
||||
eventCount: 1,
|
||||
organizationId
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to aggregate event data for ${event.event}`);
|
||||
}
|
||||
};
|
||||
|
||||
const sendPostHogEvents = async (event: TPostHogEvent) => {
|
||||
if (postHog) {
|
||||
const instanceType = licenseService.getInstanceType();
|
||||
// capture posthog only when its cloud or signup event happens in self-hosted
|
||||
if (instanceType === InstanceType.Cloud || event.event === PostHogEventTypes.UserSignedUp) {
|
||||
postHog.capture({
|
||||
event: event.event,
|
||||
distinctId: event.distinctId,
|
||||
properties: event.properties
|
||||
});
|
||||
if (event.organizationId) {
|
||||
postHog.groupIdentify({ groupType: "organization", groupKey: event.organizationId });
|
||||
}
|
||||
if (HOURLY_AGGREGATED_EVENTS.includes(event.event)) {
|
||||
await aggregateEventData(event, event.organizationId);
|
||||
} else {
|
||||
postHog.capture({
|
||||
event: event.event,
|
||||
distinctId: event.distinctId,
|
||||
properties: event.properties,
|
||||
groups: event.organizationId ? { organization: event.organizationId } : undefined
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -89,6 +213,43 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
||||
}
|
||||
};
|
||||
|
||||
const processAggregatedEvents = async () => {
|
||||
if (!postHog) return;
|
||||
const previousHourKey = getPreviousHourKey();
|
||||
|
||||
for (const eventType of HOURLY_AGGREGATED_EVENTS) {
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const events = await posthogAggregatedEventsDAL.getAllAggregatedEvent(previousHourKey, eventType);
|
||||
// eslint-disable-next-line no-continue
|
||||
if (events.length === 0) continue;
|
||||
|
||||
const processedEvents: string[] = [];
|
||||
|
||||
for (const event of events) {
|
||||
if (event.organizationId) {
|
||||
postHog.groupIdentify({ groupType: "organization", groupKey: event.organizationId });
|
||||
}
|
||||
postHog.capture({
|
||||
event: `${eventType} aggregated`,
|
||||
distinctId: event.distinctId,
|
||||
properties: event.properties as Record<string, unknown> | undefined,
|
||||
groups: event.organizationId ? { organization: event.organizationId } : undefined
|
||||
});
|
||||
processedEvents.push(event.id);
|
||||
}
|
||||
|
||||
// Clean up processed data
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await posthogAggregatedEventsDAL.delete({ $in: { id: processedEvents } });
|
||||
|
||||
logger.info(`Processed aggregated events for ${eventType} at hour ${previousHourKey}`);
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to process aggregated events for ${eventType} at hour ${previousHourKey}`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const flushAll = async () => {
|
||||
if (postHog) {
|
||||
await postHog.shutdownAsync();
|
||||
@ -98,6 +259,7 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
||||
return {
|
||||
sendLoopsEvent,
|
||||
sendPostHogEvents,
|
||||
processAggregatedEvents,
|
||||
flushAll
|
||||
};
|
||||
};
|
||||
|
@ -1,3 +1,13 @@
|
||||
import {
|
||||
IdentityActor,
|
||||
KmipClientActor,
|
||||
PlatformActor,
|
||||
ScimClientActor,
|
||||
ServiceActor,
|
||||
UnknownUserActor,
|
||||
UserActor
|
||||
} from "@app/ee/services/audit-log/audit-log-types";
|
||||
|
||||
export enum PostHogEventTypes {
|
||||
SecretPush = "secrets pushed",
|
||||
SecretPulled = "secrets pulled",
|
||||
@ -40,6 +50,14 @@ export type TSecretModifiedEvent = {
|
||||
secretPath: string;
|
||||
channel?: string;
|
||||
userAgent?: string;
|
||||
actor?:
|
||||
| UserActor
|
||||
| IdentityActor
|
||||
| ServiceActor
|
||||
| ScimClientActor
|
||||
| PlatformActor
|
||||
| UnknownUserActor
|
||||
| KmipClientActor;
|
||||
};
|
||||
};
|
||||
|
||||
@ -214,7 +232,7 @@ export type TInvalidateCacheEvent = {
|
||||
};
|
||||
};
|
||||
|
||||
export type TPostHogEvent = { distinctId: string } & (
|
||||
export type TPostHogEvent = { distinctId: string; organizationId?: string } & (
|
||||
| TSecretModifiedEvent
|
||||
| TAdminInitEvent
|
||||
| TUserSignedUpEvent
|
||||
|
@ -11,9 +11,16 @@ class Capturer {
|
||||
}
|
||||
|
||||
capture(item: string) {
|
||||
if (envConfig.ENV === "production" && envConfig.TELEMETRY_CAPTURING_ENABLED === true) {
|
||||
if ((envConfig.ENV === "production" && envConfig.TELEMETRY_CAPTURING_ENABLED === true)) {
|
||||
try {
|
||||
this.api.capture(item);
|
||||
const organizationId = String(localStorage.getItem("orgData.id"));
|
||||
this.api.capture(item, {
|
||||
...(organizationId && {
|
||||
groups: {
|
||||
organization: organizationId
|
||||
}
|
||||
})
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("PostHog", error);
|
||||
}
|
||||
|
Reference in New Issue
Block a user