mirror of
https://github.com/Infisical/infisical.git
synced 2025-07-11 12:11:38 +00:00
Compare commits
2 Commits
create-pol
...
feat/post-
Author | SHA1 | Date | |
---|---|---|---|
6b58ac3bc0 | |||
7c641d6f5c |
10
backend/src/@types/knex.d.ts
vendored
10
backend/src/@types/knex.d.ts
vendored
@ -496,6 +496,11 @@ import {
|
|||||||
TMicrosoftTeamsIntegrationsInsert,
|
TMicrosoftTeamsIntegrationsInsert,
|
||||||
TMicrosoftTeamsIntegrationsUpdate
|
TMicrosoftTeamsIntegrationsUpdate
|
||||||
} from "@app/db/schemas/microsoft-teams-integrations";
|
} from "@app/db/schemas/microsoft-teams-integrations";
|
||||||
|
import {
|
||||||
|
TPosthogAggregatedEvents,
|
||||||
|
TPosthogAggregatedEventsInsert,
|
||||||
|
TPosthogAggregatedEventsUpdate
|
||||||
|
} from "@app/db/schemas/posthog-aggregated-events";
|
||||||
import {
|
import {
|
||||||
TProjectMicrosoftTeamsConfigs,
|
TProjectMicrosoftTeamsConfigs,
|
||||||
TProjectMicrosoftTeamsConfigsInsert,
|
TProjectMicrosoftTeamsConfigsInsert,
|
||||||
@ -1203,5 +1208,10 @@ declare module "knex/types/tables" {
|
|||||||
TSecretScanningConfigsInsert,
|
TSecretScanningConfigsInsert,
|
||||||
TSecretScanningConfigsUpdate
|
TSecretScanningConfigsUpdate
|
||||||
>;
|
>;
|
||||||
|
[TableName.PosthogAggregatedEvents]: KnexOriginal.CompositeTableType<
|
||||||
|
TPosthogAggregatedEvents,
|
||||||
|
TPosthogAggregatedEventsInsert,
|
||||||
|
TPosthogAggregatedEventsUpdate
|
||||||
|
>;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
import { Knex } from "knex";
|
||||||
|
|
||||||
|
import { TableName } from "../schemas";
|
||||||
|
import { createOnUpdateTrigger, dropOnUpdateTrigger } from "../utils";
|
||||||
|
|
||||||
|
export async function up(knex: Knex): Promise<void> {
|
||||||
|
if (!(await knex.schema.hasTable(TableName.PosthogAggregatedEvents))) {
|
||||||
|
await knex.schema.createTable(TableName.PosthogAggregatedEvents, (t) => {
|
||||||
|
t.uuid("id", { primaryKey: true }).defaultTo(knex.fn.uuid());
|
||||||
|
t.string("distinctId").notNullable();
|
||||||
|
t.string("event").notNullable();
|
||||||
|
t.bigInteger("eventCount").defaultTo(0).notNullable();
|
||||||
|
t.jsonb("properties").notNullable();
|
||||||
|
t.string("batchId").notNullable();
|
||||||
|
t.string("organizationId").nullable();
|
||||||
|
t.timestamps(true, true, true);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
await createOnUpdateTrigger(knex, TableName.PosthogAggregatedEvents);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function down(knex: Knex): Promise<void> {
|
||||||
|
await knex.schema.dropTableIfExists(TableName.PosthogAggregatedEvents);
|
||||||
|
await dropOnUpdateTrigger(knex, TableName.PosthogAggregatedEvents);
|
||||||
|
}
|
@ -171,7 +171,9 @@ export enum TableName {
|
|||||||
SecretScanningResource = "secret_scanning_resources",
|
SecretScanningResource = "secret_scanning_resources",
|
||||||
SecretScanningScan = "secret_scanning_scans",
|
SecretScanningScan = "secret_scanning_scans",
|
||||||
SecretScanningFinding = "secret_scanning_findings",
|
SecretScanningFinding = "secret_scanning_findings",
|
||||||
SecretScanningConfig = "secret_scanning_configs"
|
SecretScanningConfig = "secret_scanning_configs",
|
||||||
|
// Telemetry
|
||||||
|
PosthogAggregatedEvents = "posthog_aggregated_events"
|
||||||
}
|
}
|
||||||
|
|
||||||
export type TImmutableDBKeys = "id" | "createdAt" | "updatedAt" | "commitId";
|
export type TImmutableDBKeys = "id" | "createdAt" | "updatedAt" | "commitId";
|
||||||
|
26
backend/src/db/schemas/posthog-aggregated-events.ts
Normal file
26
backend/src/db/schemas/posthog-aggregated-events.ts
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
// Code generated by automation script, DO NOT EDIT.
|
||||||
|
// Automated by pulling database and generating zod schema
|
||||||
|
// To update. Just run npm run generate:schema
|
||||||
|
// Written by akhilmhdh.
|
||||||
|
|
||||||
|
import { z } from "zod";
|
||||||
|
|
||||||
|
import { TImmutableDBKeys } from "./models";
|
||||||
|
|
||||||
|
export const PosthogAggregatedEventsSchema = z.object({
|
||||||
|
id: z.string().uuid(),
|
||||||
|
distinctId: z.string(),
|
||||||
|
event: z.string(),
|
||||||
|
eventCount: z.coerce.number().default(0),
|
||||||
|
properties: z.unknown(),
|
||||||
|
batchId: z.string(),
|
||||||
|
organizationId: z.string().nullable().optional(),
|
||||||
|
createdAt: z.date(),
|
||||||
|
updatedAt: z.date()
|
||||||
|
});
|
||||||
|
|
||||||
|
export type TPosthogAggregatedEvents = z.infer<typeof PosthogAggregatedEventsSchema>;
|
||||||
|
export type TPosthogAggregatedEventsInsert = Omit<z.input<typeof PosthogAggregatedEventsSchema>, TImmutableDBKeys>;
|
||||||
|
export type TPosthogAggregatedEventsUpdate = Partial<
|
||||||
|
Omit<z.input<typeof PosthogAggregatedEventsSchema>, TImmutableDBKeys>
|
||||||
|
>;
|
@ -80,6 +80,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SignSshKey,
|
event: PostHogEventTypes.SignSshKey,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
certificateTemplateId: req.body.certificateTemplateId,
|
certificateTemplateId: req.body.certificateTemplateId,
|
||||||
principals: req.body.principals,
|
principals: req.body.principals,
|
||||||
@ -171,6 +172,7 @@ export const registerSshCertRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueSshCreds,
|
event: PostHogEventTypes.IssueSshCreds,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
certificateTemplateId: req.body.certificateTemplateId,
|
certificateTemplateId: req.body.certificateTemplateId,
|
||||||
principals: req.body.principals,
|
principals: req.body.principals,
|
||||||
|
@ -358,6 +358,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueSshHostUserCert,
|
event: PostHogEventTypes.IssueSshHostUserCert,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
sshHostId: req.params.sshHostId,
|
sshHostId: req.params.sshHostId,
|
||||||
hostname: host.hostname,
|
hostname: host.hostname,
|
||||||
@ -427,6 +428,7 @@ export const registerSshHostRouter = async (server: FastifyZodProvider) => {
|
|||||||
|
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueSshHostHostCert,
|
event: PostHogEventTypes.IssueSshHostHostCert,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
sshHostId: req.params.sshHostId,
|
sshHostId: req.params.sshHostId,
|
||||||
|
@ -62,7 +62,8 @@ export enum QueueName {
|
|||||||
SecretRotationV2 = "secret-rotation-v2",
|
SecretRotationV2 = "secret-rotation-v2",
|
||||||
FolderTreeCheckpoint = "folder-tree-checkpoint",
|
FolderTreeCheckpoint = "folder-tree-checkpoint",
|
||||||
InvalidateCache = "invalidate-cache",
|
InvalidateCache = "invalidate-cache",
|
||||||
SecretScanningV2 = "secret-scanning-v2"
|
SecretScanningV2 = "secret-scanning-v2",
|
||||||
|
TelemetryAggregatedEvents = "telemetry-aggregated-events"
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum QueueJobs {
|
export enum QueueJobs {
|
||||||
@ -101,7 +102,8 @@ export enum QueueJobs {
|
|||||||
SecretScanningV2DiffScan = "secret-scanning-v2-diff-scan",
|
SecretScanningV2DiffScan = "secret-scanning-v2-diff-scan",
|
||||||
SecretScanningV2SendNotification = "secret-scanning-v2-notification",
|
SecretScanningV2SendNotification = "secret-scanning-v2-notification",
|
||||||
CaOrderCertificateForSubscriber = "ca-order-certificate-for-subscriber",
|
CaOrderCertificateForSubscriber = "ca-order-certificate-for-subscriber",
|
||||||
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal"
|
PkiSubscriberDailyAutoRenewal = "pki-subscriber-daily-auto-renewal",
|
||||||
|
TelemetryAggregatedEvents = "telemetry-aggregated-events"
|
||||||
}
|
}
|
||||||
|
|
||||||
export type TQueueJobTypes = {
|
export type TQueueJobTypes = {
|
||||||
@ -292,6 +294,10 @@ export type TQueueJobTypes = {
|
|||||||
name: QueueJobs.PkiSubscriberDailyAutoRenewal;
|
name: QueueJobs.PkiSubscriberDailyAutoRenewal;
|
||||||
payload: undefined;
|
payload: undefined;
|
||||||
};
|
};
|
||||||
|
[QueueName.TelemetryAggregatedEvents]: {
|
||||||
|
name: QueueJobs.TelemetryAggregatedEvents;
|
||||||
|
payload: undefined;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
const SECRET_SCANNING_JOBS = [
|
const SECRET_SCANNING_JOBS = [
|
||||||
|
@ -278,6 +278,7 @@ import { TSmtpService } from "@app/services/smtp/smtp-service";
|
|||||||
import { invalidateCacheQueueFactory } from "@app/services/super-admin/invalidate-cache-queue";
|
import { invalidateCacheQueueFactory } from "@app/services/super-admin/invalidate-cache-queue";
|
||||||
import { superAdminDALFactory } from "@app/services/super-admin/super-admin-dal";
|
import { superAdminDALFactory } from "@app/services/super-admin/super-admin-dal";
|
||||||
import { getServerCfg, superAdminServiceFactory } from "@app/services/super-admin/super-admin-service";
|
import { getServerCfg, superAdminServiceFactory } from "@app/services/super-admin/super-admin-service";
|
||||||
|
import { posthogAggregatedEventsDALFactory } from "@app/services/telemetry/posthog-aggregated-events-dal";
|
||||||
import { telemetryDALFactory } from "@app/services/telemetry/telemetry-dal";
|
import { telemetryDALFactory } from "@app/services/telemetry/telemetry-dal";
|
||||||
import { telemetryQueueServiceFactory } from "@app/services/telemetry/telemetry-queue";
|
import { telemetryQueueServiceFactory } from "@app/services/telemetry/telemetry-queue";
|
||||||
import { telemetryServiceFactory } from "@app/services/telemetry/telemetry-service";
|
import { telemetryServiceFactory } from "@app/services/telemetry/telemetry-service";
|
||||||
@ -398,6 +399,7 @@ export const registerRoutes = async (
|
|||||||
const auditLogStreamDAL = auditLogStreamDALFactory(db);
|
const auditLogStreamDAL = auditLogStreamDALFactory(db);
|
||||||
const trustedIpDAL = trustedIpDALFactory(db);
|
const trustedIpDAL = trustedIpDALFactory(db);
|
||||||
const telemetryDAL = telemetryDALFactory(db);
|
const telemetryDAL = telemetryDALFactory(db);
|
||||||
|
const posthogAggregatedEventsDAL = posthogAggregatedEventsDALFactory(db);
|
||||||
const appConnectionDAL = appConnectionDALFactory(db);
|
const appConnectionDAL = appConnectionDALFactory(db);
|
||||||
const secretSyncDAL = secretSyncDALFactory(db, folderDAL);
|
const secretSyncDAL = secretSyncDALFactory(db, folderDAL);
|
||||||
|
|
||||||
@ -681,12 +683,14 @@ export const registerRoutes = async (
|
|||||||
|
|
||||||
const telemetryService = telemetryServiceFactory({
|
const telemetryService = telemetryServiceFactory({
|
||||||
keyStore,
|
keyStore,
|
||||||
licenseService
|
licenseService,
|
||||||
|
posthogAggregatedEventsDAL
|
||||||
});
|
});
|
||||||
const telemetryQueue = telemetryQueueServiceFactory({
|
const telemetryQueue = telemetryQueueServiceFactory({
|
||||||
keyStore,
|
keyStore,
|
||||||
telemetryDAL,
|
telemetryDAL,
|
||||||
queueService
|
queueService,
|
||||||
|
telemetryService
|
||||||
});
|
});
|
||||||
|
|
||||||
const invalidateCacheQueue = invalidateCacheQueueFactory({
|
const invalidateCacheQueue = invalidateCacheQueueFactory({
|
||||||
|
@ -722,6 +722,7 @@ export const registerAdminRouter = async (server: FastifyZodProvider) => {
|
|||||||
|
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.InvalidateCache,
|
event: PostHogEventTypes.InvalidateCache,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
...req.auditLogInfo
|
...req.auditLogInfo
|
||||||
|
@ -692,6 +692,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
|
|||||||
|
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueCert,
|
event: PostHogEventTypes.IssueCert,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
caId: ca.id,
|
caId: ca.id,
|
||||||
@ -786,6 +787,7 @@ export const registerCaRouter = async (server: FastifyZodProvider) => {
|
|||||||
|
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SignCert,
|
event: PostHogEventTypes.SignCert,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
caId: ca.id,
|
caId: ca.id,
|
||||||
|
@ -266,6 +266,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueCert,
|
event: PostHogEventTypes.IssueCert,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
caId: req.body.caId,
|
caId: req.body.caId,
|
||||||
certificateTemplateId: req.body.certificateTemplateId,
|
certificateTemplateId: req.body.certificateTemplateId,
|
||||||
@ -442,6 +443,7 @@ export const registerCertRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SignCert,
|
event: PostHogEventTypes.SignCert,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
caId: req.body.caId,
|
caId: req.body.caId,
|
||||||
certificateTemplateId: req.body.certificateTemplateId,
|
certificateTemplateId: req.body.certificateTemplateId,
|
||||||
|
@ -475,6 +475,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secretCountFromEnv,
|
numberOfSecrets: secretCountFromEnv,
|
||||||
workspaceId: projectId,
|
workspaceId: projectId,
|
||||||
@ -979,6 +980,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secretCount,
|
numberOfSecrets: secretCount,
|
||||||
workspaceId: projectId,
|
workspaceId: projectId,
|
||||||
@ -1144,6 +1146,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secretCountForEnv,
|
numberOfSecrets: secretCountForEnv,
|
||||||
workspaceId: projectId,
|
workspaceId: projectId,
|
||||||
@ -1336,6 +1339,7 @@ export const registerDashboardRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: projectId,
|
workspaceId: projectId,
|
||||||
|
@ -85,6 +85,7 @@ export const registerIdentityRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.MachineIdentityCreated,
|
event: PostHogEventTypes.MachineIdentityCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
orgId: req.body.organizationId,
|
orgId: req.body.organizationId,
|
||||||
name: identity.name,
|
name: identity.name,
|
||||||
|
@ -103,6 +103,7 @@ export const registerIntegrationRouter = async (server: FastifyZodProvider) => {
|
|||||||
|
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IntegrationCreated,
|
event: PostHogEventTypes.IntegrationCreated,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
...createIntegrationEventProperty,
|
...createIntegrationEventProperty,
|
||||||
|
@ -64,6 +64,7 @@ export const registerInviteOrgRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.UserOrgInvitation,
|
event: PostHogEventTypes.UserOrgInvitation,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
inviteeEmails: req.body.inviteeEmails,
|
inviteeEmails: req.body.inviteeEmails,
|
||||||
organizationRoleSlug: req.body.organizationRoleSlug,
|
organizationRoleSlug: req.body.organizationRoleSlug,
|
||||||
|
@ -331,6 +331,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
|||||||
|
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueCert,
|
event: PostHogEventTypes.IssueCert,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
subscriberId: subscriber.id,
|
subscriberId: subscriber.id,
|
||||||
@ -399,6 +400,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.IssueCert,
|
event: PostHogEventTypes.IssueCert,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
subscriberId: subscriber.id,
|
subscriberId: subscriber.id,
|
||||||
commonName: subscriber.commonName,
|
commonName: subscriber.commonName,
|
||||||
@ -471,6 +473,7 @@ export const registerPkiSubscriberRouter = async (server: FastifyZodProvider) =>
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SignCert,
|
event: PostHogEventTypes.SignCert,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
subscriberId: subscriber.id,
|
subscriberId: subscriber.id,
|
||||||
commonName: subscriber.commonName,
|
commonName: subscriber.commonName,
|
||||||
|
@ -165,6 +165,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretRequestDeleted,
|
event: PostHogEventTypes.SecretRequestDeleted,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
secretRequestId: req.params.id,
|
secretRequestId: req.params.id,
|
||||||
organizationId: req.permission.orgId,
|
organizationId: req.permission.orgId,
|
||||||
@ -256,6 +257,7 @@ export const registerSecretRequestsRouter = async (server: FastifyZodProvider) =
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretRequestCreated,
|
event: PostHogEventTypes.SecretRequestCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
secretRequestId: shareRequest.id,
|
secretRequestId: shareRequest.id,
|
||||||
organizationId: req.permission.orgId,
|
organizationId: req.permission.orgId,
|
||||||
|
@ -198,6 +198,7 @@ export const registerProjectRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.ProjectCreated,
|
event: PostHogEventTypes.ProjectCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
orgId: project.orgId,
|
orgId: project.orgId,
|
||||||
name: project.name,
|
name: project.name,
|
||||||
|
@ -339,6 +339,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
@ -484,6 +485,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
if (getUserAgentType(req.headers["user-agent"]) !== UserAgentType.K8_OPERATOR) {
|
if (getUserAgentType(req.headers["user-agent"]) !== UserAgentType.K8_OPERATOR) {
|
||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
@ -600,6 +602,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretCreated,
|
event: PostHogEventTypes.SecretCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -725,6 +728,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretUpdated,
|
event: PostHogEventTypes.SecretUpdated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -815,6 +819,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretDeleted,
|
event: PostHogEventTypes.SecretDeleted,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -922,6 +927,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: req.query.workspaceId,
|
workspaceId: req.query.workspaceId,
|
||||||
@ -1001,6 +1007,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretPulled,
|
event: PostHogEventTypes.SecretPulled,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.query.workspaceId,
|
workspaceId: req.query.workspaceId,
|
||||||
@ -1172,6 +1179,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretCreated,
|
event: PostHogEventTypes.SecretCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -1361,6 +1369,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretUpdated,
|
event: PostHogEventTypes.SecretUpdated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -1484,6 +1493,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretDeleted,
|
event: PostHogEventTypes.SecretDeleted,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: 1,
|
numberOfSecrets: 1,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -1667,6 +1677,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretCreated,
|
event: PostHogEventTypes.SecretCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -1793,6 +1804,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretUpdated,
|
event: PostHogEventTypes.SecretUpdated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -1911,6 +1923,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretDeleted,
|
event: PostHogEventTypes.SecretDeleted,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: req.body.workspaceId,
|
workspaceId: req.body.workspaceId,
|
||||||
@ -2019,6 +2032,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretCreated,
|
event: PostHogEventTypes.SecretCreated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: secrets[0].workspace,
|
workspaceId: secrets[0].workspace,
|
||||||
@ -2174,6 +2188,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretUpdated,
|
event: PostHogEventTypes.SecretUpdated,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: secrets[0].workspace,
|
workspaceId: secrets[0].workspace,
|
||||||
@ -2272,6 +2287,7 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
|||||||
await server.services.telemetry.sendPostHogEvents({
|
await server.services.telemetry.sendPostHogEvents({
|
||||||
event: PostHogEventTypes.SecretDeleted,
|
event: PostHogEventTypes.SecretDeleted,
|
||||||
distinctId: getTelemetryDistinctId(req),
|
distinctId: getTelemetryDistinctId(req),
|
||||||
|
organizationId: req.permission.orgId,
|
||||||
properties: {
|
properties: {
|
||||||
numberOfSecrets: secrets.length,
|
numberOfSecrets: secrets.length,
|
||||||
workspaceId: secrets[0].workspace,
|
workspaceId: secrets[0].workspace,
|
||||||
|
@ -0,0 +1,53 @@
|
|||||||
|
import { Knex } from "knex";
|
||||||
|
|
||||||
|
import { TDbClient } from "@app/db";
|
||||||
|
import { TableName } from "@app/db/schemas";
|
||||||
|
import { TPosthogAggregatedEvents } from "@app/db/schemas/posthog-aggregated-events";
|
||||||
|
import { DatabaseError } from "@app/lib/errors";
|
||||||
|
import { buildFindFilter, ormify, selectAllTableCols } from "@app/lib/knex";
|
||||||
|
|
||||||
|
export type TPosthogAggregatedEventsDALFactory = ReturnType<typeof posthogAggregatedEventsDALFactory>;
|
||||||
|
|
||||||
|
export const posthogAggregatedEventsDALFactory = (db: TDbClient) => {
|
||||||
|
const posthogAggregatedEventsOrm = ormify(db, TableName.PosthogAggregatedEvents);
|
||||||
|
|
||||||
|
const getAggregatedEvent = async (
|
||||||
|
batchId: string,
|
||||||
|
distinctId: string,
|
||||||
|
event: string,
|
||||||
|
tx?: Knex
|
||||||
|
): Promise<TPosthogAggregatedEvents | undefined> => {
|
||||||
|
try {
|
||||||
|
const doc = await (tx || db.replicaNode())(TableName.PosthogAggregatedEvents)
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||||
|
.where(buildFindFilter({ batchId, distinctId, event }, TableName.PosthogAggregatedEvents))
|
||||||
|
.select(selectAllTableCols(TableName.PosthogAggregatedEvents))
|
||||||
|
.first();
|
||||||
|
return doc;
|
||||||
|
} catch (error) {
|
||||||
|
throw new DatabaseError({ error, name: "Get last hour aggregated events" });
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const getAllAggregatedEvent = async (
|
||||||
|
batchId: string,
|
||||||
|
event: string,
|
||||||
|
tx?: Knex
|
||||||
|
): Promise<TPosthogAggregatedEvents[]> => {
|
||||||
|
try {
|
||||||
|
const docs = await (tx || db.replicaNode())(TableName.PosthogAggregatedEvents)
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||||
|
.where(buildFindFilter({ batchId, event }, TableName.PosthogAggregatedEvents))
|
||||||
|
.select(selectAllTableCols(TableName.PosthogAggregatedEvents));
|
||||||
|
return docs;
|
||||||
|
} catch (error) {
|
||||||
|
throw new DatabaseError({ error, name: "Get last hour aggregated events" });
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
...posthogAggregatedEventsOrm,
|
||||||
|
getAggregatedEvent,
|
||||||
|
getAllAggregatedEvent
|
||||||
|
};
|
||||||
|
};
|
@ -7,13 +7,18 @@ import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
|
|||||||
|
|
||||||
import { getServerCfg } from "../super-admin/super-admin-service";
|
import { getServerCfg } from "../super-admin/super-admin-service";
|
||||||
import { TTelemetryDALFactory } from "./telemetry-dal";
|
import { TTelemetryDALFactory } from "./telemetry-dal";
|
||||||
import { TELEMETRY_SECRET_OPERATIONS_KEY, TELEMETRY_SECRET_PROCESSED_KEY } from "./telemetry-service";
|
import {
|
||||||
|
TELEMETRY_SECRET_OPERATIONS_KEY,
|
||||||
|
TELEMETRY_SECRET_PROCESSED_KEY,
|
||||||
|
TTelemetryServiceFactory
|
||||||
|
} from "./telemetry-service";
|
||||||
import { PostHogEventTypes } from "./telemetry-types";
|
import { PostHogEventTypes } from "./telemetry-types";
|
||||||
|
|
||||||
type TTelemetryQueueServiceFactoryDep = {
|
type TTelemetryQueueServiceFactoryDep = {
|
||||||
queueService: TQueueServiceFactory;
|
queueService: TQueueServiceFactory;
|
||||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "deleteItem">;
|
keyStore: Pick<TKeyStoreFactory, "getItem" | "deleteItem">;
|
||||||
telemetryDAL: TTelemetryDALFactory;
|
telemetryDAL: TTelemetryDALFactory;
|
||||||
|
telemetryService: TTelemetryServiceFactory;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServiceFactory>;
|
export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServiceFactory>;
|
||||||
@ -21,7 +26,8 @@ export type TTelemetryQueueServiceFactory = ReturnType<typeof telemetryQueueServ
|
|||||||
export const telemetryQueueServiceFactory = ({
|
export const telemetryQueueServiceFactory = ({
|
||||||
queueService,
|
queueService,
|
||||||
keyStore,
|
keyStore,
|
||||||
telemetryDAL
|
telemetryDAL,
|
||||||
|
telemetryService
|
||||||
}: TTelemetryQueueServiceFactoryDep) => {
|
}: TTelemetryQueueServiceFactoryDep) => {
|
||||||
const appCfg = getConfig();
|
const appCfg = getConfig();
|
||||||
const postHog =
|
const postHog =
|
||||||
@ -48,6 +54,14 @@ export const telemetryQueueServiceFactory = ({
|
|||||||
await keyStore.deleteItem(TELEMETRY_SECRET_OPERATIONS_KEY);
|
await keyStore.deleteItem(TELEMETRY_SECRET_OPERATIONS_KEY);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
queueService.start(QueueName.TelemetryAggregatedEvents, async () => {
|
||||||
|
try {
|
||||||
|
await telemetryService.processAggregatedEvents();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, "Failed to process aggregated telemetry events");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// every day at midnight a telemetry job executes on self-hosted instances
|
// every day at midnight a telemetry job executes on self-hosted instances
|
||||||
// this sends some telemetry information like instance id secrets operated etc
|
// this sends some telemetry information like instance id secrets operated etc
|
||||||
const startTelemetryCheck = async () => {
|
const startTelemetryCheck = async () => {
|
||||||
@ -60,11 +74,26 @@ export const telemetryQueueServiceFactory = ({
|
|||||||
{ pattern: "0 0 * * *", utc: true },
|
{ pattern: "0 0 * * *", utc: true },
|
||||||
QueueName.TelemetryInstanceStats // just a job id
|
QueueName.TelemetryInstanceStats // just a job id
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// clear previous hourly job
|
||||||
|
await queueService.stopRepeatableJob(
|
||||||
|
QueueName.TelemetryAggregatedEvents,
|
||||||
|
QueueJobs.TelemetryAggregatedEvents,
|
||||||
|
{ pattern: "0 * * * *", utc: true },
|
||||||
|
QueueName.TelemetryAggregatedEvents // just a job id
|
||||||
|
);
|
||||||
|
|
||||||
if (postHog) {
|
if (postHog) {
|
||||||
await queueService.queue(QueueName.TelemetryInstanceStats, QueueJobs.TelemetryInstanceStats, undefined, {
|
await queueService.queue(QueueName.TelemetryInstanceStats, QueueJobs.TelemetryInstanceStats, undefined, {
|
||||||
jobId: QueueName.TelemetryInstanceStats,
|
jobId: QueueName.TelemetryInstanceStats,
|
||||||
repeat: { pattern: "0 0 * * *", utc: true }
|
repeat: { pattern: "0 0 * * *", utc: true }
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Start hourly aggregated events job (runs every hour at minute 0)
|
||||||
|
await queueService.queue(QueueName.TelemetryAggregatedEvents, QueueJobs.TelemetryAggregatedEvents, undefined, {
|
||||||
|
jobId: QueueName.TelemetryAggregatedEvents,
|
||||||
|
repeat: { pattern: "0 * * * *", utc: true }
|
||||||
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -72,6 +101,10 @@ export const telemetryQueueServiceFactory = ({
|
|||||||
logger.error(err?.failedReason, `${QueueName.TelemetryInstanceStats}: failed`);
|
logger.error(err?.failedReason, `${QueueName.TelemetryInstanceStats}: failed`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
queueService.listen(QueueName.TelemetryAggregatedEvents, "failed", (err) => {
|
||||||
|
logger.error(err?.failedReason, `${QueueName.TelemetryAggregatedEvents}: failed`);
|
||||||
|
});
|
||||||
|
|
||||||
return {
|
return {
|
||||||
startTelemetryCheck
|
startTelemetryCheck
|
||||||
};
|
};
|
||||||
|
@ -7,18 +7,41 @@ import { getConfig } from "@app/lib/config/env";
|
|||||||
import { request } from "@app/lib/config/request";
|
import { request } from "@app/lib/config/request";
|
||||||
import { logger } from "@app/lib/logger";
|
import { logger } from "@app/lib/logger";
|
||||||
|
|
||||||
|
import { TPosthogAggregatedEventsDALFactory } from "./posthog-aggregated-events-dal";
|
||||||
import { PostHogEventTypes, TPostHogEvent, TSecretModifiedEvent } from "./telemetry-types";
|
import { PostHogEventTypes, TPostHogEvent, TSecretModifiedEvent } from "./telemetry-types";
|
||||||
|
|
||||||
export const TELEMETRY_SECRET_PROCESSED_KEY = "telemetry-secret-processed";
|
export const TELEMETRY_SECRET_PROCESSED_KEY = "telemetry-secret-processed";
|
||||||
export const TELEMETRY_SECRET_OPERATIONS_KEY = "telemetry-secret-operations";
|
export const TELEMETRY_SECRET_OPERATIONS_KEY = "telemetry-secret-operations";
|
||||||
|
|
||||||
|
export const HOURLY_AGGREGATED_EVENTS = [PostHogEventTypes.SecretPulled];
|
||||||
|
|
||||||
|
interface AggregatedEventData {
|
||||||
|
[key: string]: string | number | boolean | undefined | Record<string, number>;
|
||||||
|
}
|
||||||
|
|
||||||
export type TTelemetryServiceFactory = ReturnType<typeof telemetryServiceFactory>;
|
export type TTelemetryServiceFactory = ReturnType<typeof telemetryServiceFactory>;
|
||||||
export type TTelemetryServiceFactoryDep = {
|
export type TTelemetryServiceFactoryDep = {
|
||||||
keyStore: Pick<TKeyStoreFactory, "getItem" | "incrementBy">;
|
keyStore: Pick<TKeyStoreFactory, "getItem" | "incrementBy" | "setItem" | "deleteItem" | "setItemWithExpiry">;
|
||||||
licenseService: Pick<TLicenseServiceFactory, "getInstanceType">;
|
licenseService: Pick<TLicenseServiceFactory, "getInstanceType">;
|
||||||
|
posthogAggregatedEventsDAL: TPosthogAggregatedEventsDALFactory;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const telemetryServiceFactory = ({ keyStore, licenseService }: TTelemetryServiceFactoryDep) => {
|
const getCurrentHourKey = () => {
|
||||||
|
const now = new Date();
|
||||||
|
return `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, "0")}-${String(now.getDate()).padStart(2, "0")}-${String(now.getHours()).padStart(2, "0")}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
const getPreviousHourKey = () => {
|
||||||
|
const now = new Date();
|
||||||
|
now.setHours(now.getHours());
|
||||||
|
return `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, "0")}-${String(now.getDate()).padStart(2, "0")}-${String(now.getHours()).padStart(2, "0")}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const telemetryServiceFactory = ({
|
||||||
|
keyStore,
|
||||||
|
licenseService,
|
||||||
|
posthogAggregatedEventsDAL
|
||||||
|
}: TTelemetryServiceFactoryDep) => {
|
||||||
const appCfg = getConfig();
|
const appCfg = getConfig();
|
||||||
|
|
||||||
if (appCfg.isProductionMode && !appCfg.TELEMETRY_ENABLED) {
|
if (appCfg.isProductionMode && !appCfg.TELEMETRY_ENABLED) {
|
||||||
@ -59,16 +82,117 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const aggregateEventData = async (event: TPostHogEvent, organizationId?: string) => {
|
||||||
|
const currentHour = getCurrentHourKey();
|
||||||
|
|
||||||
|
try {
|
||||||
|
const lastHourAggregatedEvent = await posthogAggregatedEventsDAL.getAggregatedEvent(
|
||||||
|
currentHour,
|
||||||
|
event.distinctId,
|
||||||
|
event.event
|
||||||
|
);
|
||||||
|
const existingData = lastHourAggregatedEvent?.properties;
|
||||||
|
let hourlyData: AggregatedEventData = {};
|
||||||
|
|
||||||
|
if (existingData) {
|
||||||
|
hourlyData = existingData as AggregatedEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
hourlyData.count = Number(lastHourAggregatedEvent?.eventCount || 0) + 1;
|
||||||
|
|
||||||
|
if (event.properties) {
|
||||||
|
Object.entries(event.properties).forEach(([key, value]: [string, unknown]) => {
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
// For arrays, count occurrences of each item
|
||||||
|
const existingCounts =
|
||||||
|
hourlyData[key] && typeof hourlyData[key] === "object" && hourlyData[key]?.constructor === Object
|
||||||
|
? (hourlyData[key] as Record<string, number>)
|
||||||
|
: {};
|
||||||
|
|
||||||
|
value.forEach((item) => {
|
||||||
|
const itemKey = typeof item === "object" ? JSON.stringify(item) : String(item);
|
||||||
|
existingCounts[itemKey] = (existingCounts[itemKey] || 0) + 1;
|
||||||
|
});
|
||||||
|
|
||||||
|
hourlyData[key] = existingCounts;
|
||||||
|
} else if (typeof value === "object" && value?.constructor === Object) {
|
||||||
|
// For objects, count occurrences of each field value
|
||||||
|
const existingCounts =
|
||||||
|
hourlyData[key] && typeof hourlyData[key] === "object" && hourlyData[key]?.constructor === Object
|
||||||
|
? (hourlyData[key] as Record<string, number>)
|
||||||
|
: {};
|
||||||
|
|
||||||
|
if (value) {
|
||||||
|
Object.values(value).forEach((fieldValue) => {
|
||||||
|
const valueKey = typeof fieldValue === "object" ? JSON.stringify(fieldValue) : String(fieldValue);
|
||||||
|
existingCounts[valueKey] = (existingCounts[valueKey] || 0) + 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
hourlyData[key] = existingCounts;
|
||||||
|
} else if (typeof value === "number") {
|
||||||
|
// For numbers, add to existing sum
|
||||||
|
hourlyData[key] = ((hourlyData[key] as number) || 0) + value;
|
||||||
|
} else if (value !== undefined && value !== null) {
|
||||||
|
// For other types (strings, booleans, etc.), count occurrences
|
||||||
|
const stringValue = String(value);
|
||||||
|
const existingValue = hourlyData[key];
|
||||||
|
|
||||||
|
if (!existingValue) {
|
||||||
|
hourlyData[key] = { [stringValue]: 1 };
|
||||||
|
} else if (existingValue && typeof existingValue === "object" && existingValue.constructor === Object) {
|
||||||
|
const countObject = existingValue;
|
||||||
|
countObject[stringValue] = (countObject[stringValue] || 0) + 1;
|
||||||
|
} else {
|
||||||
|
const oldValue = String(existingValue);
|
||||||
|
hourlyData[key] = {
|
||||||
|
[oldValue]: 1,
|
||||||
|
[stringValue]: 1
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastHourAggregatedEvent) {
|
||||||
|
await posthogAggregatedEventsDAL.updateById(lastHourAggregatedEvent.id, {
|
||||||
|
properties: hourlyData,
|
||||||
|
eventCount: Number(lastHourAggregatedEvent.eventCount) + 1,
|
||||||
|
organizationId: lastHourAggregatedEvent?.organizationId || organizationId
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await posthogAggregatedEventsDAL.create({
|
||||||
|
batchId: currentHour,
|
||||||
|
distinctId: event.distinctId,
|
||||||
|
event: event.event,
|
||||||
|
properties: hourlyData,
|
||||||
|
eventCount: 1,
|
||||||
|
organizationId
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, `Failed to aggregate event data for ${event.event}`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const sendPostHogEvents = async (event: TPostHogEvent) => {
|
const sendPostHogEvents = async (event: TPostHogEvent) => {
|
||||||
if (postHog) {
|
if (postHog) {
|
||||||
const instanceType = licenseService.getInstanceType();
|
const instanceType = licenseService.getInstanceType();
|
||||||
// capture posthog only when its cloud or signup event happens in self-hosted
|
// capture posthog only when its cloud or signup event happens in self-hosted
|
||||||
if (instanceType === InstanceType.Cloud || event.event === PostHogEventTypes.UserSignedUp) {
|
if (instanceType === InstanceType.Cloud || event.event === PostHogEventTypes.UserSignedUp) {
|
||||||
postHog.capture({
|
if (event.organizationId) {
|
||||||
event: event.event,
|
postHog.groupIdentify({ groupType: "organization", groupKey: event.organizationId });
|
||||||
distinctId: event.distinctId,
|
}
|
||||||
properties: event.properties
|
if (HOURLY_AGGREGATED_EVENTS.includes(event.event)) {
|
||||||
});
|
await aggregateEventData(event, event.organizationId);
|
||||||
|
} else {
|
||||||
|
postHog.capture({
|
||||||
|
event: event.event,
|
||||||
|
distinctId: event.distinctId,
|
||||||
|
properties: event.properties,
|
||||||
|
groups: event.organizationId ? { organization: event.organizationId } : undefined
|
||||||
|
});
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -89,6 +213,43 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const processAggregatedEvents = async () => {
|
||||||
|
if (!postHog) return;
|
||||||
|
const previousHourKey = getPreviousHourKey();
|
||||||
|
|
||||||
|
for (const eventType of HOURLY_AGGREGATED_EVENTS) {
|
||||||
|
try {
|
||||||
|
// eslint-disable-next-line no-await-in-loop
|
||||||
|
const events = await posthogAggregatedEventsDAL.getAllAggregatedEvent(previousHourKey, eventType);
|
||||||
|
// eslint-disable-next-line no-continue
|
||||||
|
if (events.length === 0) continue;
|
||||||
|
|
||||||
|
const processedEvents: string[] = [];
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
if (event.organizationId) {
|
||||||
|
postHog.groupIdentify({ groupType: "organization", groupKey: event.organizationId });
|
||||||
|
}
|
||||||
|
postHog.capture({
|
||||||
|
event: `${eventType} aggregated`,
|
||||||
|
distinctId: event.distinctId,
|
||||||
|
properties: event.properties as Record<string, unknown> | undefined,
|
||||||
|
groups: event.organizationId ? { organization: event.organizationId } : undefined
|
||||||
|
});
|
||||||
|
processedEvents.push(event.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up processed data
|
||||||
|
// eslint-disable-next-line no-await-in-loop
|
||||||
|
await posthogAggregatedEventsDAL.delete({ $in: { id: processedEvents } });
|
||||||
|
|
||||||
|
logger.info(`Processed aggregated events for ${eventType} at hour ${previousHourKey}`);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(error, `Failed to process aggregated events for ${eventType} at hour ${previousHourKey}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const flushAll = async () => {
|
const flushAll = async () => {
|
||||||
if (postHog) {
|
if (postHog) {
|
||||||
await postHog.shutdownAsync();
|
await postHog.shutdownAsync();
|
||||||
@ -98,6 +259,7 @@ To opt into telemetry, you can set "TELEMETRY_ENABLED=true" within the environme
|
|||||||
return {
|
return {
|
||||||
sendLoopsEvent,
|
sendLoopsEvent,
|
||||||
sendPostHogEvents,
|
sendPostHogEvents,
|
||||||
|
processAggregatedEvents,
|
||||||
flushAll
|
flushAll
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
@ -1,3 +1,13 @@
|
|||||||
|
import {
|
||||||
|
IdentityActor,
|
||||||
|
KmipClientActor,
|
||||||
|
PlatformActor,
|
||||||
|
ScimClientActor,
|
||||||
|
ServiceActor,
|
||||||
|
UnknownUserActor,
|
||||||
|
UserActor
|
||||||
|
} from "@app/ee/services/audit-log/audit-log-types";
|
||||||
|
|
||||||
export enum PostHogEventTypes {
|
export enum PostHogEventTypes {
|
||||||
SecretPush = "secrets pushed",
|
SecretPush = "secrets pushed",
|
||||||
SecretPulled = "secrets pulled",
|
SecretPulled = "secrets pulled",
|
||||||
@ -40,6 +50,14 @@ export type TSecretModifiedEvent = {
|
|||||||
secretPath: string;
|
secretPath: string;
|
||||||
channel?: string;
|
channel?: string;
|
||||||
userAgent?: string;
|
userAgent?: string;
|
||||||
|
actor?:
|
||||||
|
| UserActor
|
||||||
|
| IdentityActor
|
||||||
|
| ServiceActor
|
||||||
|
| ScimClientActor
|
||||||
|
| PlatformActor
|
||||||
|
| UnknownUserActor
|
||||||
|
| KmipClientActor;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -214,7 +232,7 @@ export type TInvalidateCacheEvent = {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
export type TPostHogEvent = { distinctId: string } & (
|
export type TPostHogEvent = { distinctId: string; organizationId?: string } & (
|
||||||
| TSecretModifiedEvent
|
| TSecretModifiedEvent
|
||||||
| TAdminInitEvent
|
| TAdminInitEvent
|
||||||
| TUserSignedUpEvent
|
| TUserSignedUpEvent
|
||||||
|
@ -11,9 +11,16 @@ class Capturer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
capture(item: string) {
|
capture(item: string) {
|
||||||
if (envConfig.ENV === "production" && envConfig.TELEMETRY_CAPTURING_ENABLED === true) {
|
if ((envConfig.ENV === "production" && envConfig.TELEMETRY_CAPTURING_ENABLED === true)) {
|
||||||
try {
|
try {
|
||||||
this.api.capture(item);
|
const organizationId = String(localStorage.getItem("orgData.id"));
|
||||||
|
this.api.capture(item, {
|
||||||
|
...(organizationId && {
|
||||||
|
groups: {
|
||||||
|
organization: organizationId
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("PostHog", error);
|
console.error("PostHog", error);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user