feat: switched to pull all and selectively replicated strategy and simplified logic

This commit is contained in:
=
2024-05-31 17:36:01 +05:30
parent c67642786f
commit cf690e2e16
16 changed files with 259 additions and 338 deletions

View File

@ -32,30 +32,6 @@ export async function up(knex: Knex): Promise<void> {
});
}
const doesSecretIsReplicatedExist = await knex.schema.hasColumn(TableName.Secret, "isReplicated");
if (await knex.schema.hasTable(TableName.Secret)) {
await knex.schema.alterTable(TableName.Secret, (t) => {
if (!doesSecretIsReplicatedExist) t.boolean("isReplicated");
});
}
const doesSecretVersionIsReplicatedExist = await knex.schema.hasColumn(TableName.SecretVersion, "isReplicated");
if (await knex.schema.hasTable(TableName.SecretVersion)) {
await knex.schema.alterTable(TableName.SecretVersion, (t) => {
if (!doesSecretVersionIsReplicatedExist) t.boolean("isReplicated");
});
}
const doesSecretApprovalRequestSecretIsReplicatedExist = await knex.schema.hasColumn(
TableName.SecretApprovalRequestSecret,
"isReplicated"
);
if (await knex.schema.hasTable(TableName.SecretApprovalRequestSecret)) {
await knex.schema.alterTable(TableName.SecretApprovalRequestSecret, (t) => {
if (!doesSecretApprovalRequestSecretIsReplicatedExist) t.boolean("isReplicated");
});
}
const doesSecretApprovalRequestIsReplicatedExist = await knex.schema.hasColumn(
TableName.SecretApprovalRequest,
"isReplicated"
@ -97,30 +73,6 @@ export async function down(knex: Knex): Promise<void> {
});
}
const doesSecretIsReplicatedExist = await knex.schema.hasColumn(TableName.Secret, "isReplicated");
if (await knex.schema.hasTable(TableName.Secret)) {
await knex.schema.alterTable(TableName.Secret, (t) => {
if (doesSecretIsReplicatedExist) t.dropColumns("isReplicated");
});
}
const doesSecretVersionIsReplicatedExist = await knex.schema.hasColumn(TableName.SecretVersion, "isReplicated");
if (await knex.schema.hasTable(TableName.SecretVersion)) {
await knex.schema.alterTable(TableName.SecretVersion, (t) => {
if (doesSecretVersionIsReplicatedExist) t.dropColumns("isReplicated");
});
}
const doesSecretApprovalRequestSecretIsReplicatedExist = await knex.schema.hasColumn(
TableName.SecretApprovalRequestSecret,
"isReplicated"
);
if (await knex.schema.hasTable(TableName.SecretApprovalRequestSecret)) {
await knex.schema.alterTable(TableName.SecretApprovalRequestSecret, (t) => {
if (doesSecretApprovalRequestSecretIsReplicatedExist) t.dropColumns("isReplicated");
});
}
const doesSecretApprovalRequestIsReplicatedExist = await knex.schema.hasColumn(
TableName.SecretApprovalRequest,
"isReplicated"

View File

@ -31,8 +31,7 @@ export const SecretApprovalRequestsSecretsSchema = z.object({
requestId: z.string().uuid(),
op: z.string(),
secretId: z.string().uuid().nullable().optional(),
secretVersion: z.string().uuid().nullable().optional(),
isReplicated: z.boolean().nullable().optional()
secretVersion: z.string().uuid().nullable().optional()
});
export type TSecretApprovalRequestsSecrets = z.infer<typeof SecretApprovalRequestsSecretsSchema>;

View File

@ -32,8 +32,7 @@ export const SecretVersionsSchema = z.object({
folderId: z.string().uuid(),
userId: z.string().uuid().nullable().optional(),
createdAt: z.date(),
updatedAt: z.date(),
isReplicated: z.boolean().nullable().optional()
updatedAt: z.date()
});
export type TSecretVersions = z.infer<typeof SecretVersionsSchema>;

View File

@ -30,8 +30,7 @@ export const SecretsSchema = z.object({
userId: z.string().uuid().nullable().optional(),
folderId: z.string().uuid(),
createdAt: z.date(),
updatedAt: z.date(),
isReplicated: z.boolean().nullable().optional()
updatedAt: z.date()
});
export type TSecrets = z.infer<typeof SecretsSchema>;

View File

@ -380,8 +380,7 @@ export const secretApprovalRequestServiceFactory = ({
"secretReminderRepeatDays",
"algorithm",
"keyEncoding",
"secretBlindIndex",
"isReplicated"
"secretBlindIndex"
]),
tags: el?.tags.map(({ id }) => id),
version: 1,
@ -426,7 +425,6 @@ export const secretApprovalRequestServiceFactory = ({
"secretKeyTag",
"secretKeyIV",
"metadata",
"isReplicated",
"skipMultilineEncoding",
"secretReminderNote",
"secretReminderRepeatDays",
@ -490,28 +488,8 @@ export const secretApprovalRequestServiceFactory = ({
projectId,
secretPath: folder.path,
environmentSlug: folder.environmentSlug,
folderId: folder.id,
actorId,
actor,
environmentId: folder.envId,
secrets: mergeStatus.secrets.created
.map(({ id, version }) => ({
operation: SecretOperations.Create,
version,
id
}))
.concat(
mergeStatus.secrets.updated.map(({ id, version }) => ({
operation: SecretOperations.Update,
version,
id
})),
mergeStatus.secrets.deleted.map(({ id, version }) => ({
operation: SecretOperations.Delete,
version,
id
}))
)
actor
});
return mergeStatus;
};

View File

@ -0,0 +1 @@
export const MAX_REPLICATION_DEPTH = 5;

View File

@ -1,59 +1,10 @@
import { Knex } from "knex";
import { TDbClient } from "@app/db";
import { SecretType, TableName, TSecretVersions } from "@app/db/schemas";
import { ormify, selectAllTableCols } from "@app/lib/knex";
import { TableName } from "@app/db/schemas";
import { ormify } from "@app/lib/knex";
export type TSecretReplicationDALFactory = ReturnType<typeof secretReplicationDALFactory>;
export const secretReplicationDALFactory = (db: TDbClient) => {
const orm = ormify(db, TableName.SecretVersion);
/**
* Retrieves secret versions based on the specified filter criteria.
*
* @param {Object} filter - The filter criteria for querying secret versions.
* @param {string} filter.folderId - The ID of the folder containing the secrets.
* @param {Array<Object>} filter.secrets - An array of secret objects containing the ID and version of each secret.
* @param {Knex} [tx] - An optional Knex transaction object. If provided, the query will be executed within this transaction.
*
* @returns {Promise<Array<Object>>} A promise that resolves to an array of secret version documents that match the filter criteria.
*/
const findSecretVersions = async (
filter: { folderId: string; secrets: { id: string; version: number }[] },
tx?: Knex
) => {
if (!filter.secrets) return [];
const sqlRawDocs = await (tx || db)(TableName.SecretVersion)
.where({ folderId: filter.folderId })
.andWhere((bd) => {
filter.secrets.forEach((el) => {
void bd.orWhere({
[`${TableName.SecretVersion}.secretId` as "secretId"]: el.id,
[`${TableName.SecretVersion}.version` as "version"]: el.version,
[`${TableName.SecretVersion}.type` as "type"]: SecretType.Shared
});
});
})
.leftJoin<TSecretVersions>(
(tx || db)(TableName.SecretVersion)
.where("isReplicated", true)
.groupBy("secretId")
.max("version")
.select("secretId")
.as("latestVersion"),
`${TableName.SecretVersion}.secretId`,
"latestVersion.secretId"
)
.select(db.ref("max").withSchema("latestVersion").as("latestReplicatedVersion"))
.select(selectAllTableCols(TableName.SecretVersion));
return sqlRawDocs;
};
return {
findSecretVersions,
...orm
};
return orm;
};

View File

@ -1,41 +1,45 @@
import { SecretType, TSecrets } from "@app/db/schemas";
import { TSecretApprovalPolicyServiceFactory } from "@app/ee/services/secret-approval-policy/secret-approval-policy-service";
import { TSecretApprovalRequestDALFactory } from "@app/ee/services/secret-approval-request/secret-approval-request-dal";
import { TSecretApprovalRequestSecretDALFactory } from "@app/ee/services/secret-approval-request/secret-approval-request-secret-dal";
import { TSecretSnapshotServiceFactory } from "@app/ee/services/secret-snapshot/secret-snapshot-service";
import { KeyStorePrefixes, TKeyStoreFactory } from "@app/keystore/keystore";
import { decryptSymmetric128BitHexKeyUTF8 } from "@app/lib/crypto";
import { BadRequestError } from "@app/lib/errors";
import { groupBy } from "@app/lib/fn";
import { groupBy, unique } from "@app/lib/fn";
import { logger } from "@app/lib/logger";
import { alphaNumericNanoId } from "@app/lib/nanoid";
import { QueueName, TQueueServiceFactory } from "@app/queue";
import { ActorType } from "@app/services/auth/auth-type";
import { TProjectBotServiceFactory } from "@app/services/project-bot/project-bot-service";
import { TProjectMembershipDALFactory } from "@app/services/project-membership/project-membership-dal";
import { TSecretDALFactory } from "@app/services/secret/secret-dal";
import { fnSecretBulkInsert, fnSecretBulkUpdate } from "@app/services/secret/secret-fns";
import { TSecretQueueFactory } from "@app/services/secret/secret-queue";
import { SecretOperations, TSyncSecretsDTO } from "@app/services/secret/secret-types";
import { TSecretQueueFactory, uniqueSecretQueueKey } from "@app/services/secret/secret-queue";
import { SecretOperations } from "@app/services/secret/secret-types";
import { TSecretVersionDALFactory } from "@app/services/secret/secret-version-dal";
import { TSecretVersionTagDALFactory } from "@app/services/secret/secret-version-tag-dal";
import { TSecretBlindIndexDALFactory } from "@app/services/secret-blind-index/secret-blind-index-dal";
import { TSecretFolderDALFactory } from "@app/services/secret-folder/secret-folder-dal";
import { ReservedFolders } from "@app/services/secret-folder/secret-folder-types";
import { TSecretImportDALFactory } from "@app/services/secret-import/secret-import-dal";
import { fnSecretsFromImports } from "@app/services/secret-import/secret-import-fns";
import { TSecretTagDALFactory } from "@app/services/secret-tag/secret-tag-dal";
import { TSecretReplicationDALFactory } from "./secret-replication-dal";
import { MAX_REPLICATION_DEPTH } from "./secret-replication-constants";
type TSecretReplicationServiceFactoryDep = {
secretReplicationDAL: TSecretReplicationDALFactory;
secretDAL: Pick<
TSecretDALFactory,
"find" | "findByBlindIndexes" | "insertMany" | "bulkUpdate" | "delete" | "upsertSecretReferences"
"find" | "findByBlindIndexes" | "insertMany" | "bulkUpdate" | "delete" | "upsertSecretReferences" | "transaction"
>;
secretVersionDAL: Pick<TSecretVersionDALFactory, "find" | "insertMany" | "update" | "findLatestVersionMany">;
secretImportDAL: Pick<TSecretImportDALFactory, "find" | "updateById">;
folderDAL: Pick<TSecretFolderDALFactory, "findSecretPathByFolderIds" | "findBySecretPath" | "create" | "findOne">;
secretImportDAL: Pick<TSecretImportDALFactory, "find" | "updateById" | "findByFolderIds">;
folderDAL: Pick<
TSecretFolderDALFactory,
"findSecretPathByFolderIds" | "findBySecretPath" | "create" | "findOne" | "findByManySecretPath"
>;
secretVersionTagDAL: Pick<TSecretVersionTagDALFactory, "find" | "insertMany">;
secretQueueService: Pick<TSecretQueueFactory, "syncSecrets">;
snapshotService: Pick<TSecretSnapshotServiceFactory, "performSnapshot">;
secretQueueService: Pick<TSecretQueueFactory, "syncSecrets" | "replicateSecrets">;
queueService: Pick<TQueueServiceFactory, "start" | "listen" | "queue" | "stopJobById">;
secretApprovalPolicyService: Pick<TSecretApprovalPolicyServiceFactory, "getSecretApprovalPolicy">;
keyStore: Pick<TKeyStoreFactory, "acquireLock" | "setItemWithExpiry" | "getItem">;
@ -47,16 +51,35 @@ type TSecretReplicationServiceFactoryDep = {
TSecretApprovalRequestSecretDALFactory,
"insertMany" | "insertApprovalSecretTags"
>;
projectBotService: Pick<TProjectBotServiceFactory, "getBotKey">;
};
export type TSecretReplicationServiceFactory = ReturnType<typeof secretReplicationServiceFactory>;
const SECRET_IMPORT_SUCCESS_LOCK = 10;
const keystoreReplicationSuccessKey = (jobId: string, secretImportId: string) => `${jobId}-${secretImportId}`;
const getReplicationKeyLockPrefix = (keyName: string) => `REPLICATION_SECRET_${keyName}`;
const getReplicationKeyLockPrefix = (projectId: string, environmentSlug: string, secretPath: string) =>
`REPLICATION_SECRET_${projectId}-${environmentSlug}-${secretPath}`;
export const getReplicationFolderName = (importId: string) => `${ReservedFolders.SecretReplication}${importId}`;
const getDecryptedKeyValue = (key: string, secret: TSecrets) => {
const secretKey = decryptSymmetric128BitHexKeyUTF8({
ciphertext: secret.secretKeyCiphertext,
iv: secret.secretKeyIV,
tag: secret.secretKeyTag,
key
});
const secretValue = decryptSymmetric128BitHexKeyUTF8({
ciphertext: secret.secretValueCiphertext,
iv: secret.secretValueIV,
tag: secret.secretValueTag,
key
});
return { key: secretKey, value: secretValue };
};
export const secretReplicationServiceFactory = ({
secretReplicationDAL,
secretDAL,
queueService,
secretVersionDAL,
@ -69,124 +92,208 @@ export const secretReplicationServiceFactory = ({
secretApprovalRequestSecretDAL,
secretApprovalRequestDAL,
secretQueueService,
projectMembershipDAL
projectMembershipDAL,
projectBotService
}: TSecretReplicationServiceFactoryDep) => {
const getReplicatedSecrets = (
botKey: string,
localSecrets: TSecrets[],
importedSecrets: { secrets: TSecrets[] }[]
) => {
const deDupe = new Set<string>();
const secrets = localSecrets
.filter(({ secretBlindIndex }) => Boolean(secretBlindIndex))
.map((el) => {
const decryptedSecret = getDecryptedKeyValue(botKey, el);
deDupe.add(decryptedSecret.key);
return { ...el, secretKey: decryptedSecret.key, secretValue: decryptedSecret.value };
});
for (let i = importedSecrets.length - 1; i >= 0; i = -1) {
importedSecrets[i].secrets.forEach((el) => {
const decryptedSecret = getDecryptedKeyValue(botKey, el);
if (deDupe.has(decryptedSecret.key) || !el.secretBlindIndex) {
return;
}
deDupe.add(decryptedSecret.key);
secrets.push({ ...el, secretKey: decryptedSecret.key, secretValue: decryptedSecret.value });
});
}
return secrets;
};
// IMPORTANT NOTE BEFORE READING THE FUNCTION
// SOURCE - Where secrets are copied from
// DESTINATION - Where the replicated imports that points to SOURCE from Destination
queueService.start(QueueName.SecretReplication, async (job) => {
logger.info(job.data, "Replication started");
const {
secrets,
folderId,
secretPath,
environmentId,
environmentSlug,
projectId,
actorId,
actor,
pickOnlyImportIds,
_deDupeReplicationQueue: deDupeReplicationQueue,
_deDupeQueue: deDupeQueue
} = job.data; // source import details (this is where the secrets are to be synced from)
_deDupeQueue: deDupeQueue,
_depth: depth = 0
} = job.data;
if (depth > MAX_REPLICATION_DEPTH) return;
// filter for initial filling
let secretImports = await secretImportDAL.find({
const folder = await folderDAL.findBySecretPath(projectId, environmentSlug, secretPath);
if (!folder) return;
// the the replicated imports made to the source. These are the destinations
const destinationSecretImports = await secretImportDAL.find({
importPath: secretPath,
importEnv: environmentId,
isReplication: true
importEnv: folder.envId
});
secretImports = pickOnlyImportIds
? secretImports.filter(({ id }) => pickOnlyImportIds?.includes(id))
: secretImports;
if (!secretImports.length || !secrets.length) return;
// unfiltered secrets to be replicated (will fetch the latest versions in case another queue already processed this request)
const toBeReplicatedSecrets = await secretReplicationDAL.findSecretVersions({ folderId, secrets });
// CASE: normal mode <- link import <- replicated import
const nonReplicatedDestinationImports = destinationSecretImports.filter(({ isReplication }) => !isReplication);
if (nonReplicatedDestinationImports.length) {
// keep calling sync secret for all the imports made
const importedFolderIds = unique(nonReplicatedDestinationImports, (i) => i.folderId).map(
({ folderId }) => folderId
);
const importedFolders = await folderDAL.findSecretPathByFolderIds(projectId, importedFolderIds);
const foldersGroupedById = groupBy(importedFolders.filter(Boolean), (i) => i?.id as string);
await Promise.all(
nonReplicatedDestinationImports
.filter(({ folderId }) => Boolean(foldersGroupedById[folderId][0]?.path as string))
// filter out already synced ones
.filter(
({ folderId }) =>
!deDupeQueue?.[
uniqueSecretQueueKey(
foldersGroupedById[folderId][0]?.environmentSlug as string,
foldersGroupedById[folderId][0]?.path as string
)
]
)
.map(({ folderId }) =>
secretQueueService.replicateSecrets({
projectId,
secretPath: foldersGroupedById[folderId][0]?.path as string,
environmentSlug: foldersGroupedById[folderId][0]?.environmentSlug as string,
actorId,
actor,
_depth: depth + 1,
_deDupeReplicationQueue: deDupeReplicationQueue,
_deDupeQueue: deDupeQueue
})
)
);
}
// case: https://www.notion.so/infisical/Secret-Replication-6907fbe3130c4124976f7cba1b9fc4c7
const replicatedSecrets = toBeReplicatedSecrets.filter(
({ version, latestReplicatedVersion, secretBlindIndex }) =>
secretBlindIndex && (version === 1 || latestReplicatedVersion <= version)
let destinationReplicatedSecretImports = destinationSecretImports.filter(({ isReplication }) =>
Boolean(isReplication)
);
const replicatedSecretsGroupBySecretId = groupBy(replicatedSecrets, (i) => i.secretId);
// this is to filter out personal secrets
const sanitizedSecrets = secrets.filter(({ id }) => Object.hasOwn(replicatedSecretsGroupBySecretId, id));
if (!sanitizedSecrets.length) return;
destinationReplicatedSecretImports = pickOnlyImportIds
? destinationReplicatedSecretImports.filter(({ id }) => pickOnlyImportIds?.includes(id))
: destinationReplicatedSecretImports;
if (!destinationReplicatedSecretImports.length) return;
const botKey = await projectBotService.getBotKey(projectId);
// these are the secrets to be added in replicated folders
const sourceLocalSecrets = await secretDAL.find({ folderId: folder.id, type: SecretType.Shared });
const sourceSecretImports = await secretImportDAL.find({ folderId: folder.id });
const sourceImportedSecrets = await fnSecretsFromImports({
allowedImports: sourceSecretImports,
secretDAL,
folderDAL,
secretImportDAL
});
// secrets that gets replicated across imports
const sourceSecrets = getReplicatedSecrets(botKey, sourceLocalSecrets, sourceImportedSecrets);
const sourceSecretsGroupByBlindIndex = groupBy(sourceSecrets, (i) => i.secretBlindIndex as string);
const lock = await keyStore.acquireLock(
replicatedSecrets.map(({ id }) => getReplicationKeyLockPrefix(id)),
[getReplicationKeyLockPrefix(projectId, environmentSlug, secretPath)],
5000
);
try {
/* eslint-disable no-await-in-loop */
for (const secretImport of secretImports) {
for (const destinationSecretImport of destinationReplicatedSecretImports) {
try {
const hasJobCompleted = await keyStore.getItem(
keystoreReplicationSuccessKey(job.id as string, secretImport.id),
keystoreReplicationSuccessKey(job.id as string, destinationSecretImport.id),
KeyStorePrefixes.SecretReplication
);
if (hasJobCompleted) {
logger.info(
{ jobId: job.id, importId: secretImport.id },
{ jobId: job.id, importId: destinationSecretImport.id },
"Skipping this job as this has been successfully replicated."
);
// eslint-disable-next-line
continue;
}
const [importedFolder] = await folderDAL.findSecretPathByFolderIds(projectId, [secretImport.folderId]);
if (!importedFolder) throw new BadRequestError({ message: "Imported folder not found" });
const [destinationFolder] = await folderDAL.findSecretPathByFolderIds(projectId, [
destinationSecretImport.folderId
]);
if (!destinationFolder) throw new BadRequestError({ message: "Imported folder not found" });
let replicationFolder = await folderDAL.findOne({
parentId: importedFolder.id,
name: getReplicationFolderName(secretImport.id),
let destinationReplicationFolder = await folderDAL.findOne({
parentId: destinationFolder.id,
name: getReplicationFolderName(destinationSecretImport.id),
isReserved: true
});
if (!replicationFolder) {
replicationFolder = await folderDAL.create({
parentId: importedFolder.id,
name: getReplicationFolderName(secretImport.id),
envId: importedFolder.envId,
if (!destinationReplicationFolder) {
destinationReplicationFolder = await folderDAL.create({
parentId: destinationFolder.id,
name: getReplicationFolderName(destinationSecretImport.id),
envId: destinationFolder.envId,
isReserved: true
});
}
const replicationFolderId = replicationFolder.id;
const destinationReplicationFolderId = destinationReplicationFolder.id;
const localSecrets = await secretDAL.find({
$in: { secretBlindIndex: replicatedSecrets.map(({ secretBlindIndex }) => secretBlindIndex) },
folderId: replicationFolderId
const destinationLocalSecretsFromDB = await secretDAL.find({
folderId: destinationReplicationFolderId
});
const destinationLocalSecrets = destinationLocalSecretsFromDB.map((el) => {
const decryptedSecret = getDecryptedKeyValue(botKey, el);
return { ...el, secretKey: decryptedSecret.key, secretValue: decryptedSecret.value };
});
const localSecretsGroupedByBlindIndex = groupBy(localSecrets, (i) => i.secretBlindIndex as string);
const locallyCreatedSecrets = sanitizedSecrets
const destinationLocalSecretsGroupedByBlindIndex = groupBy(
destinationLocalSecrets.filter(({ secretBlindIndex }) => Boolean(secretBlindIndex)),
(i) => i.secretBlindIndex as string
);
const locallyCreatedSecrets = sourceSecrets
.filter(
({ operation, id }) =>
// upsert: irrespective of create or update its a create if not found in dashboard
(operation === SecretOperations.Create || operation === SecretOperations.Update) &&
!localSecretsGroupedByBlindIndex[
replicatedSecretsGroupBySecretId[id][0].secretBlindIndex as string
]?.[0]
({ secretBlindIndex }) => !destinationLocalSecretsGroupedByBlindIndex[secretBlindIndex as string]?.[0]
)
.map((el) => ({ ...el, operation: SecretOperations.Create })); // rewrite update ops to create
const locallyUpdatedSecrets = sanitizedSecrets
const locallyUpdatedSecrets = sourceSecrets
.filter(
({ operation, id }) =>
// upsert: irrespective of create or update its an update if not found in dashboard
(operation === SecretOperations.Create || operation === SecretOperations.Update) &&
localSecretsGroupedByBlindIndex[replicatedSecretsGroupBySecretId[id][0].secretBlindIndex as string]?.[0]
({ secretBlindIndex, secretKey, secretValue }) =>
destinationLocalSecretsGroupedByBlindIndex[secretBlindIndex as string]?.[0] &&
// if key or value changed
(destinationLocalSecretsGroupedByBlindIndex[secretBlindIndex as string]?.[0]?.secretKey !== secretKey ||
destinationLocalSecretsGroupedByBlindIndex[secretBlindIndex as string]?.[0]?.secretValue !==
secretValue)
)
.map((el) => ({ ...el, operation: SecretOperations.Update })); // rewrite create ops to update
.map((el) => ({ ...el, operation: SecretOperations.Update })); // rewrite update ops to create
const locallyDeletedSecrets = sanitizedSecrets.filter(
({ operation, id }) =>
operation === SecretOperations.Delete &&
Boolean(replicatedSecretsGroupBySecretId[id]?.[0]?.secretBlindIndex) &&
localSecretsGroupedByBlindIndex[replicatedSecretsGroupBySecretId[id][0].secretBlindIndex as string]?.[0]
);
const locallyDeletedSecrets = destinationLocalSecrets
.filter(({ secretBlindIndex }) => !sourceSecretsGroupByBlindIndex[secretBlindIndex as string]?.[0])
.map((el) => ({ ...el, operation: SecretOperations.Delete }));
const isEmtpy =
locallyCreatedSecrets.length + locallyUpdatedSecrets.length + locallyDeletedSecrets.length === 0;
// eslint-disable-next-line
if (isEmtpy) continue;
const policy = await secretApprovalPolicyService.getSecretApprovalPolicy(
projectId,
importedFolder.environmentSlug,
importedFolder.path
destinationFolder.environmentSlug,
destinationFolder.path
);
// this means it should be a approval request rather than direct replication
if (policy && actor === ActorType.USER) {
@ -196,15 +303,15 @@ export const secretReplicationServiceFactory = ({
return;
}
const localSecretsLatestVersions = localSecrets.map(({ id }) => id);
const localSecretsLatestVersions = destinationLocalSecrets.map(({ id }) => id);
const latestSecretVersions = await secretVersionDAL.findLatestVersionMany(
replicationFolderId,
destinationReplicationFolderId,
localSecretsLatestVersions
);
await secretApprovalRequestDAL.transaction(async (tx) => {
const approvalRequestDoc = await secretApprovalRequestDAL.create(
{
folderId: replicationFolderId,
folderId: destinationReplicationFolderId,
slug: alphaNumericNanoId(),
policyId: policy.id,
status: "open",
@ -217,9 +324,9 @@ export const secretReplicationServiceFactory = ({
const commits = locallyCreatedSecrets
.concat(locallyUpdatedSecrets)
.concat(locallyDeletedSecrets)
.map(({ id, operation }) => {
const doc = replicatedSecretsGroupBySecretId[id][0];
const localSecret = localSecretsGroupedByBlindIndex[doc.secretBlindIndex as string]?.[0];
.map((doc) => {
const { operation } = doc;
const localSecret = destinationLocalSecretsGroupedByBlindIndex[doc.secretBlindIndex as string]?.[0];
return {
op: operation,
@ -237,7 +344,6 @@ export const secretReplicationServiceFactory = ({
secretCommentIV: doc.secretCommentIV,
secretCommentTag: doc.secretCommentTag,
secretCommentCiphertext: doc.secretCommentCiphertext,
isReplicated: true,
skipMultilineEncoding: doc.skipMultilineEncoding,
// except create operation other two needs the secret id and version id
...(operation !== SecretOperations.Create
@ -250,18 +356,16 @@ export const secretReplicationServiceFactory = ({
return { ...approvalRequestDoc, commits: approvalCommits };
});
} else {
let nestedImportSecrets: TSyncSecretsDTO["secrets"] = [];
await secretReplicationDAL.transaction(async (tx) => {
await secretDAL.transaction(async (tx) => {
if (locallyCreatedSecrets.length) {
const newSecrets = await fnSecretBulkInsert({
folderId: replicationFolderId,
await fnSecretBulkInsert({
folderId: destinationReplicationFolderId,
secretVersionDAL,
secretDAL,
tx,
secretTagDAL,
secretVersionTagDAL,
inputSecrets: locallyCreatedSecrets.map(({ id }) => {
const doc = replicatedSecretsGroupBySecretId[id][0];
inputSecrets: locallyCreatedSecrets.map((doc) => {
return {
keyEncoding: doc.keyEncoding,
algorithm: doc.algorithm,
@ -277,30 +381,25 @@ export const secretReplicationServiceFactory = ({
secretCommentIV: doc.secretCommentIV,
secretCommentTag: doc.secretCommentTag,
secretCommentCiphertext: doc.secretCommentCiphertext,
isReplicated: true,
skipMultilineEncoding: doc.skipMultilineEncoding
};
})
});
nestedImportSecrets = nestedImportSecrets.concat(
newSecrets.map(({ id, version }) => ({ operation: SecretOperations.Create, version, id }))
);
}
if (locallyUpdatedSecrets.length) {
const newSecrets = await fnSecretBulkUpdate({
await fnSecretBulkUpdate({
projectId,
folderId: replicationFolderId,
folderId: destinationReplicationFolderId,
secretVersionDAL,
secretDAL,
tx,
secretTagDAL,
secretVersionTagDAL,
inputSecrets: locallyUpdatedSecrets.map(({ id }) => {
const doc = replicatedSecretsGroupBySecretId[id][0];
inputSecrets: locallyUpdatedSecrets.map((doc) => {
return {
filter: {
folderId: replicationFolderId,
id: localSecretsGroupedByBlindIndex[doc.secretBlindIndex as string][0].id
folderId: destinationReplicationFolderId,
id: destinationLocalSecretsGroupedByBlindIndex[doc.secretBlindIndex as string][0].id
},
data: {
keyEncoding: doc.keyEncoding,
@ -317,56 +416,46 @@ export const secretReplicationServiceFactory = ({
secretCommentIV: doc.secretCommentIV,
secretCommentTag: doc.secretCommentTag,
secretCommentCiphertext: doc.secretCommentCiphertext,
isReplicated: true,
skipMultilineEncoding: doc.skipMultilineEncoding
}
};
})
});
nestedImportSecrets = nestedImportSecrets.concat(
newSecrets.map(({ id, version }) => ({ operation: SecretOperations.Update, version, id }))
);
}
if (locallyDeletedSecrets.length) {
const newSecrets = await secretDAL.delete(
await secretDAL.delete(
{
$in: {
id: locallyDeletedSecrets.map(({ id }) => id)
},
isReplicated: true,
folderId: replicationFolderId
folderId: destinationReplicationFolderId
},
tx
);
nestedImportSecrets = nestedImportSecrets.concat(
newSecrets.map(({ id, version }) => ({ operation: SecretOperations.Delete, version, id }))
);
}
});
await secretQueueService.syncSecrets({
projectId,
secretPath: importedFolder.path,
_deDupeReplicationQueue: deDupeReplicationQueue,
_deDupeQueue: deDupeQueue,
environmentSlug: importedFolder.environmentSlug,
secretPath: destinationFolder.path,
environmentSlug: destinationFolder.environmentSlug,
actorId,
actor,
secrets: nestedImportSecrets,
folderId: importedFolder.id,
environmentId: importedFolder.envId
_depth: depth + 1,
_deDupeReplicationQueue: deDupeReplicationQueue,
_deDupeQueue: deDupeQueue
});
}
// this is used to avoid multiple times generating secret approval by failed one
await keyStore.setItemWithExpiry(
keystoreReplicationSuccessKey(job.id as string, secretImport.id),
keystoreReplicationSuccessKey(job.id as string, destinationSecretImport.id),
SECRET_IMPORT_SUCCESS_LOCK,
1,
KeyStorePrefixes.SecretReplication
);
await secretImportDAL.updateById(secretImport.id, {
await secretImportDAL.updateById(destinationSecretImport.id, {
lastReplicated: new Date(),
replicationStatus: null,
isReplicationSuccess: true
@ -374,17 +463,15 @@ export const secretReplicationServiceFactory = ({
} catch (err) {
logger.error(
err,
`Failed to replicate secret with import id=[${secretImport.id}] env=[${secretImport.importEnv.slug}] path=[${secretImport.importPath}]`
`Failed to replicate secret with import id=[${destinationSecretImport.id}] env=[${destinationSecretImport.importEnv.slug}] path=[${destinationSecretImport.importPath}]`
);
await secretImportDAL.updateById(secretImport.id, {
await secretImportDAL.updateById(destinationSecretImport.id, {
lastReplicated: new Date(),
replicationStatus: (err as Error)?.message.slice(0, 500),
isReplicationSuccess: false
});
}
}
await secretVersionDAL.update({ $in: { id: replicatedSecrets.map(({ id }) => id) } }, { isReplicated: true });
/* eslint-enable no-await-in-loop */
} finally {
await lock.release();

View File

@ -123,7 +123,7 @@ export type TQueueJobTypes = {
};
[QueueName.SecretReplication]: {
name: QueueJobs.SecretReplication;
payload: Omit<TSyncSecretsDTO, "environmentSlug">;
payload: TSyncSecretsDTO;
};
[QueueName.SecretSync]: {
name: QueueJobs.SecretSync;

View File

@ -44,7 +44,6 @@ import { secretApprovalRequestDALFactory } from "@app/ee/services/secret-approva
import { secretApprovalRequestReviewerDALFactory } from "@app/ee/services/secret-approval-request/secret-approval-request-reviewer-dal";
import { secretApprovalRequestSecretDALFactory } from "@app/ee/services/secret-approval-request/secret-approval-request-secret-dal";
import { secretApprovalRequestServiceFactory } from "@app/ee/services/secret-approval-request/secret-approval-request-service";
import { secretReplicationDALFactory } from "@app/ee/services/secret-replication/secret-replication-dal";
import { secretReplicationServiceFactory } from "@app/ee/services/secret-replication/secret-replication-service";
import { secretRotationDALFactory } from "@app/ee/services/secret-rotation/secret-rotation-dal";
import { secretRotationQueueFactory } from "@app/ee/services/secret-rotation/secret-rotation-queue";
@ -195,7 +194,6 @@ export const registerRoutes = async (
const projectBotDAL = projectBotDALFactory(db);
const secretDAL = secretDALFactory(db);
const secretReplicationDAL = secretReplicationDALFactory(db);
const secretTagDAL = secretTagDALFactory(db);
const folderDAL = secretFolderDALFactory(db);
const folderVersionDAL = secretFolderVersionDALFactory(db);
@ -673,15 +671,14 @@ export const registerRoutes = async (
secretImportDAL,
keyStore,
queueService,
secretReplicationDAL,
folderDAL,
secretApprovalPolicyService,
secretBlindIndexDAL,
secretApprovalRequestDAL,
secretApprovalRequestSecretDAL,
secretQueueService,
snapshotService,
projectMembershipDAL
projectMembershipDAL,
projectBotService
});
const secretRotationQueue = secretRotationQueueFactory({
telemetryService,

View File

@ -13,7 +13,6 @@ import { TProjectDALFactory } from "../project/project-dal";
import { TProjectEnvDALFactory } from "../project-env/project-env-dal";
import { TSecretDALFactory } from "../secret/secret-dal";
import { TSecretQueueFactory } from "../secret/secret-queue";
import { SecretOperations } from "../secret/secret-types";
import { TSecretFolderDALFactory } from "../secret-folder/secret-folder-dal";
import { TSecretImportDALFactory } from "./secret-import-dal";
import { fnSecretsFromImports } from "./secret-import-fns";
@ -139,24 +138,21 @@ export const secretImportServiceFactory = ({
});
if (secImport.isReplication && sourceFolder) {
const importedSecrets = await secretDAL.find({ folderId: sourceFolder?.id });
await secretQueueService.replicateSecrets({
secretPath: secImport.importPath,
projectId,
environmentSlug: importEnv.slug,
pickOnlyImportIds: [secImport.id],
folderId: sourceFolder.id,
secrets: importedSecrets.map(({ id, version }) => ({ operation: SecretOperations.Create, version, id })),
actorId,
actor,
environmentId: importEnv.id
actor
});
} else {
await secretQueueService.syncSecrets({
secretPath: secImport.importPath,
secretPath,
projectId,
environmentSlug: importEnv.slug,
excludeReplication: true
environmentSlug: environment,
actorId,
actor
});
}
@ -307,7 +303,8 @@ export const secretImportServiceFactory = ({
secretPath,
projectId,
environmentSlug: environment,
excludeReplication: true
actor,
actorId
});
return secImport;
@ -372,18 +369,14 @@ export const secretImportServiceFactory = ({
secretImportDoc.importPath
);
const importedSecrets = await secretDAL.find({ folderId: sourceFolder?.id });
if (membership && sourceFolder) {
await secretQueueService.replicateSecrets({
secretPath: secretImportDoc.importPath,
projectId,
environmentSlug: secretImportDoc.importEnv.slug,
pickOnlyImportIds: [secretImportDoc.id],
folderId: sourceFolder.id,
secrets: importedSecrets.map(({ id, version }) => ({ operation: SecretOperations.Create, version, id })),
actorId,
actor,
environmentId: secretImportDoc.importEnv.id
actor
});
}

View File

@ -75,7 +75,7 @@ export const secretDALFactory = (db: TDbClient) => {
};
const deleteMany = async (
data: Array<{ blindIndex: string; type: SecretType; isReplicated?: boolean }>,
data: Array<{ blindIndex: string; type: SecretType }>,
folderId: string,
userId: string,
tx?: Knex

View File

@ -536,7 +536,7 @@ export const fnSecretBulkInsert = async ({
}))
);
const secretVersions = await secretVersionDAL.insertMany(
inputSecrets.map(({ tags, references, isReplicated, ...el }) => ({
inputSecrets.map(({ tags, references, ...el }) => ({
...el,
folderId,
secretId: newSecretGroupByBlindIndex[el.secretBlindIndex as string][0].id

View File

@ -64,7 +64,7 @@ export type TGetSecrets = {
};
const MAX_SYNC_SECRET_DEPTH = 5;
const uniqueSecretQueueKey = (environment: string, secretPath: string) =>
export const uniqueSecretQueueKey = (environment: string, secretPath: string) =>
`secret-queue-dedupe-${environment}-${secretPath}`;
type TIntegrationSecret = Record<string, { value: string; comment?: string; skipMultilineEncoding?: boolean }>;
@ -325,6 +325,7 @@ export const secretQueueFactory = ({
const syncSecrets = async <T extends boolean = false>({
// seperate de-dupe queue for integration sync and replication sync
_deDupeQueue: deDupeQueue = {},
_depth: depth = 0,
_deDupeReplicationQueue: deDupeReplicationQueue = {},
...dto
}: TSyncSecretsDTO<T>) => {
@ -332,7 +333,11 @@ export const secretQueueFactory = ({
`syncSecrets: syncing project secrets where [projectId=${dto.projectId}] [environment=${dto.environmentSlug}] [path=${dto.secretPath}]`
);
const deDuplicationKey = uniqueSecretQueueKey(dto.environmentSlug, dto.secretPath);
if (!dto.excludeReplication ? deDupeReplicationQueue?.[deDuplicationKey] : deDupeQueue?.[deDuplicationKey]) {
if (
!dto.excludeReplication
? deDupeReplicationQueue?.[deDuplicationKey]
: deDupeQueue?.[deDuplicationKey] || depth > MAX_SYNC_SECRET_DEPTH
) {
return;
}
// eslint-disable-next-line
@ -342,7 +347,12 @@ export const secretQueueFactory = ({
await queueService.queue(
QueueName.SecretSync,
QueueJobs.SecretSync,
{ ...dto, _deDupeQueue: deDupeQueue, _deDupeReplicationQueue: deDupeReplicationQueue } as TSyncSecretsDTO,
{
...dto,
_deDupeQueue: deDupeQueue,
_deDupeReplicationQueue: deDupeReplicationQueue,
_depth: depth
} as TSyncSecretsDTO,
{
removeOnFail: true,
removeOnComplete: true,
@ -360,22 +370,21 @@ export const secretQueueFactory = ({
const {
_deDupeQueue: deDupeQueue,
_deDupeReplicationQueue: deDupeReplicationQueue,
_depth: depth,
secretPath,
environmentId,
projectId,
environmentSlug: environment,
secrets,
folderId,
excludeReplication,
actorId,
actor
} = job.data;
await queueService.queue(
QueueName.SecretWebhook,
QueueJobs.SecWebhook,
{ environment, projectId, secretPath },
{
jobId: `secret-webhook-${environmentId}-${projectId}-${secretPath}`,
jobId: `secret-webhook-${environment}-${projectId}-${secretPath}`,
removeOnFail: { count: 5 },
removeOnComplete: true,
delay: 1000,
@ -390,11 +399,9 @@ export const secretQueueFactory = ({
if (!excludeReplication) {
await replicateSecrets({
_deDupeReplicationQueue: deDupeReplicationQueue,
environmentId,
_depth: depth,
projectId,
secretPath,
folderId,
secrets,
actorId,
actor,
excludeReplication,
@ -405,6 +412,7 @@ export const secretQueueFactory = ({
queueService.start(QueueName.IntegrationSync, async (job) => {
const { environment, projectId, secretPath, depth = 1, deDupeQueue = {} } = job.data;
if (depth > MAX_SYNC_SECRET_DEPTH) return;
const folder = await folderDAL.findBySecretPath(projectId, environment, secretPath);
if (!folder) {
@ -450,6 +458,7 @@ export const secretQueueFactory = ({
secretPath: foldersGroupedById[folderId][0]?.path as string,
environmentSlug: foldersGroupedById[folderId][0]?.environmentSlug as string,
_deDupeQueue: deDupeQueue,
_depth: depth + 1,
excludeReplication: true
})
)
@ -487,6 +496,7 @@ export const secretQueueFactory = ({
secretPath: referencedFoldersGroupedById[folderId][0]?.path as string,
environmentSlug: referencedFoldersGroupedById[folderId][0]?.environmentSlug as string,
_deDupeQueue: deDupeQueue,
_depth: depth + 1,
excludeReplication: true
})
)

View File

@ -44,7 +44,6 @@ import {
} from "./secret-fns";
import { TSecretQueueFactory } from "./secret-queue";
import {
SecretOperations,
TAttachSecretTagsDTO,
TBackFillSecretReferencesDTO,
TCreateBulkSecretDTO,
@ -238,19 +237,10 @@ export const secretServiceFactory = ({
await snapshotService.performSnapshot(folderId);
await secretQueueService.syncSecrets({
secretPath: path,
folderId: folder.id,
actorId,
actor,
projectId,
environmentSlug: folder.environment.slug,
environmentId: folder.envId,
secrets: [
{
operation: SecretOperations.Create,
id: secret[0].id,
version: 1
}
]
environmentSlug: folder.environment.slug
});
return { ...secret[0], environment, workspace: projectId, tags, secretPath: path };
};
@ -362,7 +352,6 @@ export const secretServiceFactory = ({
"secretReminderRepeatDays",
"tags"
]),
isReplicated: false,
secretBlindIndex: newSecretNameBlindIndex || keyName2BlindIndex[secretName],
references: references({
ciphertext: inputSecret.secretValueCiphertext,
@ -385,17 +374,8 @@ export const secretServiceFactory = ({
actor,
actorId,
secretPath: path,
folderId: folder.id,
projectId,
environmentSlug: folder.environment.slug,
environmentId: folder.envId,
secrets: [
{
operation: SecretOperations.Update,
id: updatedSecret[0].id,
version: updatedSecret[0].version
}
]
environmentSlug: folder.environment.slug
});
return { ...updatedSecret[0], workspace: projectId, environment, secretPath: path };
};
@ -469,17 +449,8 @@ export const secretServiceFactory = ({
actor,
actorId,
secretPath: path,
folderId: folder.id,
projectId,
environmentSlug: folder.environment.slug,
environmentId: folder.envId,
secrets: [
{
operation: SecretOperations.Delete,
id: deletedSecret[0].id,
version: deletedSecret[0].version
}
]
environmentSlug: folder.environment.slug
});
// TODO(akhilmhdh-pg): licence check, posthog service and snapshot
return { ...deletedSecret[0], _id: deletedSecret[0].id, workspace: projectId, environment, secretPath: path };
@ -770,11 +741,8 @@ export const secretServiceFactory = ({
actor,
actorId,
secretPath: path,
folderId: folder.id,
projectId,
environmentSlug: folder.environment.slug,
environmentId: folder.envId,
secrets: newSecrets.map(({ id, version }) => ({ id, version, operation: SecretOperations.Create }))
environmentSlug: folder.environment.slug
});
return newSecrets;
@ -851,7 +819,6 @@ export const secretServiceFactory = ({
...el,
folderId,
type: SecretType.Shared,
isReplicated: false,
secretBlindIndex:
newSecretName && newKeyName2BlindIndex[newSecretName]
? newKeyName2BlindIndex[newSecretName]
@ -880,11 +847,8 @@ export const secretServiceFactory = ({
actor,
actorId,
secretPath: path,
folderId: folder.id,
projectId,
environmentSlug: folder.environment.slug,
environmentId: folder.envId,
secrets: secrets.map(({ id, version }) => ({ id, version, operation: SecretOperations.Update }))
environmentSlug: folder.environment.slug
});
return secrets;
@ -953,11 +917,8 @@ export const secretServiceFactory = ({
actor,
actorId,
secretPath: path,
folderId: folder.id,
projectId,
environmentSlug: folder.environment.slug,
environmentId: folder.envId,
secrets: secretsDeleted.map(({ id, version }) => ({ id, version, operation: SecretOperations.Delete }))
environmentSlug: folder.environment.slug
});
return secretsDeleted;

View File

@ -380,6 +380,7 @@ export enum SecretOperations {
export type TSyncSecretsDTO<T extends boolean = false> = {
_deDupeQueue?: Record<string, boolean>;
_deDupeReplicationQueue?: Record<string, boolean>;
_depth?: number;
secretPath: string;
projectId: string;
environmentSlug: string;
@ -388,15 +389,8 @@ export type TSyncSecretsDTO<T extends boolean = false> = {
} & (T extends true
? object
: {
environmentId: string;
folderId: string;
actor: ActorType;
actorId: string;
// used for import creation to trigger replication
pickOnlyImportIds?: string[];
secrets: {
operation: SecretOperations;
id: string;
version: number;
}[];
});