misc: finalized structure

This commit is contained in:
Sheen Capadngan
2024-12-04 22:49:28 +08:00
parent 11c96245a7
commit a750f48922
4 changed files with 100 additions and 93 deletions

View File

@ -10,12 +10,15 @@ export const mockQueue = (): TQueueServiceFactory => {
queue: async (name, jobData) => {
job[name] = jobData;
},
queuePg: async () => {},
initialize: async () => {},
shutdown: async () => undefined,
stopRepeatableJob: async () => true,
start: (name, jobFn) => {
queues[name] = jobFn;
workers[name] = jobFn;
},
startPg: async () => {},
listen: (name, event) => {
events[name] = event;
},

View File

@ -34,8 +34,9 @@ export const auditLogQueueServiceFactory = async ({
licenseService,
auditLogStreamDAL
}: TAuditLogQueueServiceFactoryDep) => {
const appCfg = getConfig();
const pushToLog = async (data: TCreateAuditLogDTO) => {
const appCfg = getConfig();
if (appCfg.USE_PG_QUEUE) {
await queueService.queuePg<QueueName.AuditLog>(QueueJobs.AuditLog, data, {
retryLimit: 10,
@ -51,96 +52,98 @@ export const auditLogQueueServiceFactory = async ({
}
};
await queueService.startPg<QueueName.AuditLog>(
QueueJobs.AuditLog,
async ([job]) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
let { orgId } = job.data;
const MS_IN_DAY = 24 * 60 * 60 * 1000;
let project;
if (appCfg.USE_PG_QUEUE) {
await queueService.startPg<QueueName.AuditLog>(
QueueJobs.AuditLog,
async ([job]) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;
let { orgId } = job.data;
const MS_IN_DAY = 24 * 60 * 60 * 1000;
let project;
if (!orgId) {
// it will never be undefined for both org and project id
// TODO(akhilmhdh): use caching here in dal to avoid db calls
project = await projectDAL.findById(projectId as string);
orgId = project.orgId;
}
if (!orgId) {
// it will never be undefined for both org and project id
// TODO(akhilmhdh): use caching here in dal to avoid db calls
project = await projectDAL.findById(projectId as string);
orgId = project.orgId;
}
const plan = await licenseService.getPlan(orgId);
if (plan.auditLogsRetentionDays === 0) {
// skip inserting if audit log retention is 0 meaning its not supported
return;
}
const plan = await licenseService.getPlan(orgId);
if (plan.auditLogsRetentionDays === 0) {
// skip inserting if audit log retention is 0 meaning its not supported
return;
}
// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;
// For project actions, set TTL to project-level audit log retention config
// This condition ensures that the plan's audit log retention days cannot be bypassed
const ttlInDays =
project?.auditLogsRetentionDays && project.auditLogsRetentionDays < plan.auditLogsRetentionDays
? project.auditLogsRetentionDays
: plan.auditLogsRetentionDays;
const ttl = ttlInDays * MS_IN_DAY;
const ttl = ttlInDays * MS_IN_DAY;
const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});
const auditLog = await auditLogDAL.create({
actor: actor.type,
actorMetadata: actor.metadata,
userAgent,
projectId,
projectName: project?.name,
ipAddress,
orgId,
eventType: event.type,
expiresAt: new Date(Date.now() + ttl),
eventMetadata: event.metadata,
userAgentType
});
const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
infisicalSymmetricDecrypt({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];
const logStreams = orgId ? await auditLogStreamDAL.find({ orgId }) : [];
await Promise.allSettled(
logStreams.map(
async ({
url,
encryptedHeadersTag,
encryptedHeadersIV,
encryptedHeadersKeyEncoding,
encryptedHeadersCiphertext
}) => {
const streamHeaders =
encryptedHeadersIV && encryptedHeadersCiphertext && encryptedHeadersTag
? (JSON.parse(
infisicalSymmetricDecrypt({
keyEncoding: encryptedHeadersKeyEncoding as SecretKeyEncoding,
iv: encryptedHeadersIV,
tag: encryptedHeadersTag,
ciphertext: encryptedHeadersCiphertext
})
) as LogStreamHeaders[])
: [];
const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };
const headers: RawAxiosRequestHeaders = { "Content-Type": "application/json" };
if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
if (streamHeaders.length)
streamHeaders.forEach(({ key, value }) => {
headers[key] = value;
});
return request.post(url, auditLog, {
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
});
return request.post(url, auditLog, {
headers,
// request timeout
timeout: AUDIT_LOG_STREAM_TIMEOUT,
// connection timeout
signal: AbortSignal.timeout(AUDIT_LOG_STREAM_TIMEOUT)
});
}
)
);
},
{
batchSize: 1,
workerCount: 30,
pollingIntervalSeconds: 0.5
}
);
}
)
);
},
{
batchSize: 1,
workerCount: 30,
pollingIntervalSeconds: 0.5
}
);
}
queueService.start(QueueName.AuditLog, async (job) => {
const { actor, event, ipAddress, projectId, userAgent, userAgentType } = job.data;

View File

@ -57,11 +57,7 @@ const run = async () => {
const smtp = smtpServiceFactory(formatSmtpConfig());
const queue = queueServiceFactory(appCfg.REDIS_URL, appCfg.DB_CONNECTION_URI);
if (appCfg.USE_PG_QUEUE) {
logger.info("Initializing PG queue...");
await queue.initialize();
}
await queue.initialize();
const keyStore = keyStoreFactory(appCfg.REDIS_URL);

View File

@ -8,6 +8,7 @@ import {
TScanFullRepoEventPayload,
TScanPushEventPayload
} from "@app/ee/services/secret-scanning/secret-scanning-queue/secret-scanning-queue-types";
import { getConfig } from "@app/lib/config/env";
import { logger } from "@app/lib/logger";
import {
TFailedIntegrationSyncEmailsPayload,
@ -195,8 +196,8 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) =
const pgBoss = new PgBoss({
connectionString: dbConnectionUrl,
archiveCompletedAfterSeconds: 30,
maintenanceIntervalSeconds: 30,
archiveCompletedAfterSeconds: 60,
archiveFailedAfterSeconds: 1000, // we want to keep failed jobs for a longer time so that it can be retried
deleteAfterSeconds: 30
});
@ -208,11 +209,15 @@ export const queueServiceFactory = (redisUrl: string, dbConnectionUrl: string) =
>;
const initialize = async () => {
await pgBoss.start();
const appCfg = getConfig();
if (appCfg.USE_PG_QUEUE) {
logger.info("Initializing PG queue...");
await pgBoss.start();
pgBoss.on("error", (error) => {
logger.error(error, "pg-queue error");
});
pgBoss.on("error", (error) => {
logger.error(error, "pg-queue error");
});
}
};
const start = <T extends QueueName>(