Compare commits

...

4 Commits

Author SHA1 Message Date
a1e6c6f7d5 Merge pull request #2295 from akhilmhdh/feat/replication-test
fix: switched sync integration to have redis lock
2024-08-15 15:18:42 -04:00
=
cc94a3366a feat: made requested changes for integration sync 2024-08-15 23:38:16 +05:30
=
6cab7504fc fix: switched sync integration to have redis lock 2024-08-15 22:04:32 +05:30
fa31f87479 Merge pull request #2292 from Infisical/doc/add-dynamic-secrets-to-api-reference
doc: add dynamic secrets to api references
2024-08-15 15:37:42 +08:00
4 changed files with 178 additions and 114 deletions

View File

@ -5,17 +5,26 @@ import { Redlock, Settings } from "@app/lib/red-lock";
export type TKeyStoreFactory = ReturnType<typeof keyStoreFactory>;
// all the key prefixes used must be set here to avoid conflict
export enum KeyStorePrefixes {
SecretReplication = "secret-replication-import-lock",
KmsProjectDataKeyCreation = "kms-project-data-key-creation-lock",
KmsProjectKeyCreation = "kms-project-key-creation-lock",
WaitUntilReadyKmsProjectDataKeyCreation = "wait-until-ready-kms-project-data-key-creation-",
WaitUntilReadyKmsProjectKeyCreation = "wait-until-ready-kms-project-key-creation-",
KmsOrgKeyCreation = "kms-org-key-creation-lock",
KmsOrgDataKeyCreation = "kms-org-data-key-creation-lock",
WaitUntilReadyKmsOrgKeyCreation = "wait-until-ready-kms-org-key-creation-",
WaitUntilReadyKmsOrgDataKeyCreation = "wait-until-ready-kms-org-data-key-creation-"
}
export const KeyStorePrefixes = {
SecretReplication: "secret-replication-import-lock",
KmsProjectDataKeyCreation: "kms-project-data-key-creation-lock",
KmsProjectKeyCreation: "kms-project-key-creation-lock",
WaitUntilReadyKmsProjectDataKeyCreation: "wait-until-ready-kms-project-data-key-creation-",
WaitUntilReadyKmsProjectKeyCreation: "wait-until-ready-kms-project-key-creation-",
KmsOrgKeyCreation: "kms-org-key-creation-lock",
KmsOrgDataKeyCreation: "kms-org-data-key-creation-lock",
WaitUntilReadyKmsOrgKeyCreation: "wait-until-ready-kms-org-key-creation-",
WaitUntilReadyKmsOrgDataKeyCreation: "wait-until-ready-kms-org-data-key-creation-",
SyncSecretIntegrationLock: (projectId: string, environmentSlug: string, secretPath: string) =>
`sync-integration-mutex-${projectId}-${environmentSlug}-${secretPath}` as const,
SyncSecretIntegrationLastRunTimestamp: (projectId: string, environmentSlug: string, secretPath: string) =>
`sync-integration-last-run-${projectId}-${environmentSlug}-${secretPath}` as const
};
export const KeyStoreTtls = {
SetSyncSecretIntegrationLastRunTimestampInSeconds: 10
};
type TWaitTillReady = {
key: string;
@ -37,10 +46,10 @@ export const keyStoreFactory = (redisUrl: string) => {
const setItemWithExpiry = async (
key: string,
exp: number | string,
expiryInSeconds: number | string,
value: string | number | Buffer,
prefix?: string
) => redis.set(prefix ? `${prefix}:${key}` : key, value, "EX", exp);
) => redis.set(prefix ? `${prefix}:${key}` : key, value, "EX", expiryInSeconds);
const deleteItem = async (key: string) => redis.del(key);

View File

@ -1,2 +1,8 @@
export const getLastMidnightDateISO = (last = 1) =>
`${new Date(new Date().setDate(new Date().getDate() - last)).toISOString().slice(0, 10)}T00:00:00Z`;
export const getTimeDifferenceInSeconds = (lhsTimestamp: string, rhsTimestamp: string) => {
const lhs = new Date(lhsTimestamp);
const rhs = new Date(rhsTimestamp);
return Math.floor((Number(lhs) - Number(rhs)) / 1000);
};

View File

@ -711,6 +711,7 @@ export const registerRoutes = async (
kmsService
});
const secretQueueService = secretQueueFactory({
keyStore,
queueService,
secretDAL,
folderDAL,

View File

@ -6,11 +6,12 @@ import { TSecretApprovalRequestDALFactory } from "@app/ee/services/secret-approv
import { TSecretRotationDALFactory } from "@app/ee/services/secret-rotation/secret-rotation-dal";
import { TSnapshotDALFactory } from "@app/ee/services/secret-snapshot/snapshot-dal";
import { TSnapshotSecretV2DALFactory } from "@app/ee/services/secret-snapshot/snapshot-secret-v2-dal";
import { KeyStorePrefixes, KeyStoreTtls, TKeyStoreFactory } from "@app/keystore/keystore";
import { getConfig } from "@app/lib/config/env";
import { decryptSymmetric128BitHexKeyUTF8 } from "@app/lib/crypto";
import { daysToMillisecond, secondsToMillis } from "@app/lib/dates";
import { BadRequestError } from "@app/lib/errors";
import { groupBy, isSamePath, unique } from "@app/lib/fn";
import { getTimeDifferenceInSeconds, groupBy, isSamePath, unique } from "@app/lib/fn";
import { logger } from "@app/lib/logger";
import { QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue";
import { TProjectBotDALFactory } from "@app/services/project-bot/project-bot-dal";
@ -79,6 +80,7 @@ type TSecretQueueFactoryDep = {
secretApprovalRequestDAL: Pick<TSecretApprovalRequestDALFactory, "deleteByProjectId">;
snapshotDAL: Pick<TSnapshotDALFactory, "findNSecretV1SnapshotByFolderId" | "deleteSnapshotsAboveLimit">;
snapshotSecretV2BridgeDAL: Pick<TSnapshotSecretV2DALFactory, "insertMany" | "batchInsert">;
keyStore: Pick<TKeyStoreFactory, "acquireLock" | "setItemWithExpiry" | "getItem">;
};
export type TGetSecrets = {
@ -122,7 +124,8 @@ export const secretQueueFactory = ({
secretRotationDAL,
snapshotDAL,
snapshotSecretV2BridgeDAL,
secretApprovalRequestDAL
secretApprovalRequestDAL,
keyStore
}: TSecretQueueFactoryDep) => {
const removeSecretReminder = async (dto: TRemoveSecretReminderDTO) => {
const appCfg = getConfig();
@ -576,7 +579,6 @@ export const secretQueueFactory = ({
)
);
}
const { shouldUseSecretV2Bridge, botKey } = await projectBotService.getBotKey(projectId);
const { decryptor: secretManagerDecryptor } = await kmsService.createCipherPairWithDataKey({
type: KmsDataKey.SecretManager,
@ -641,111 +643,157 @@ export const secretQueueFactory = ({
`getIntegrationSecrets: secret integration sync started [jobId=${job.id}] [jobId=${job.id}] [projectId=${job.data.projectId}] [environment=${job.data.environment}] [secretPath=${job.data.secretPath}] [depth=${job.data.depth}]`
);
const secrets = shouldUseSecretV2Bridge
? await getIntegrationSecretsV2({
environment,
projectId,
folderId: folder.id,
depth: 1,
decryptor: (value) => (value ? secretManagerDecryptor({ cipherTextBlob: value }).toString() : "")
})
: await getIntegrationSecrets({
environment,
projectId,
folderId: folder.id,
key: botKey as string,
depth: 1
});
for (const integration of toBeSyncedIntegrations) {
const integrationAuth = {
...integration.integrationAuth,
createdAt: new Date(),
updatedAt: new Date(),
projectId: integration.projectId
};
const { accessToken, accessId } = await integrationAuthService.getIntegrationAccessToken(
integrationAuth,
shouldUseSecretV2Bridge,
botKey
);
let awsAssumeRoleArn = null;
if (shouldUseSecretV2Bridge) {
if (integrationAuth.encryptedAwsAssumeIamRoleArn) {
awsAssumeRoleArn = secretManagerDecryptor({
cipherTextBlob: Buffer.from(integrationAuth.encryptedAwsAssumeIamRoleArn)
}).toString();
}
} else if (
integrationAuth.awsAssumeIamRoleArnTag &&
integrationAuth.awsAssumeIamRoleArnIV &&
integrationAuth.awsAssumeIamRoleArnCipherText
) {
awsAssumeRoleArn = decryptSymmetric128BitHexKeyUTF8({
ciphertext: integrationAuth.awsAssumeIamRoleArnCipherText,
iv: integrationAuth.awsAssumeIamRoleArnIV,
tag: integrationAuth.awsAssumeIamRoleArnTag,
key: botKey as string
});
const lock = await keyStore.acquireLock(
[KeyStorePrefixes.SyncSecretIntegrationLock(projectId, environment, secretPath)],
10000,
{
retryCount: 3,
retryDelay: 2000
}
);
const lockAcquiredTime = new Date();
const suffixedSecrets: typeof secrets = {};
const metadata = integration.metadata as Record<string, string>;
if (metadata) {
Object.keys(secrets).forEach((key) => {
const prefix = metadata?.secretPrefix || "";
const suffix = metadata?.secretSuffix || "";
const newKey = prefix + key + suffix;
suffixedSecrets[newKey] = secrets[key];
});
}
const lastRunSyncIntegrationTimestamp = await keyStore.getItem(
KeyStorePrefixes.SyncSecretIntegrationLastRunTimestamp(projectId, environment, secretPath)
);
try {
// akhilmhdh: this needs to changed later to be more easier to use
// at present this is not at all extendable like to add a new parameter for just one integration need to modify multiple places
const response = await syncIntegrationSecrets({
createManySecretsRawFn,
updateManySecretsRawFn,
integrationDAL,
integration,
integrationAuth,
secrets: Object.keys(suffixedSecrets).length !== 0 ? suffixedSecrets : secrets,
accessId: accessId as string,
awsAssumeRoleArn,
accessToken,
projectId,
appendices: {
prefix: metadata?.secretPrefix || "",
suffix: metadata?.secretSuffix || ""
}
});
await integrationDAL.updateById(integration.id, {
lastSyncJobId: job.id,
lastUsed: new Date(),
syncMessage: response?.syncMessage ?? "",
isSynced: response?.isSynced ?? true
});
} catch (err) {
logger.error(
err,
`Secret integration sync error [projectId=${job.data.projectId}] [environment=${job.data.environment}] [secretPath=${job.data.secretPath}]`
// check whether the integration should wait or not
if (lastRunSyncIntegrationTimestamp) {
const INTEGRATION_INTERVAL = 2000;
const isStaleSyncIntegration = new Date(job.timestamp) < new Date(lastRunSyncIntegrationTimestamp);
if (isStaleSyncIntegration) {
logger.info(
`getIntegrationSecrets: secret integration sync stale [jobId=${job.id}] [jobId=${job.id}] [projectId=${job.data.projectId}] [environment=${job.data.environment}] [secretPath=${job.data.secretPath}] [depth=${job.data.depth}]`
);
const message =
(err instanceof AxiosError ? JSON.stringify(err?.response?.data) : (err as Error)?.message) ||
"Unknown error occurred.";
await integrationDAL.updateById(integration.id, {
lastSyncJobId: job.id,
lastUsed: new Date(),
syncMessage: message,
isSynced: false
});
return;
}
const timeDifferenceWithLastIntegration = getTimeDifferenceInSeconds(
lockAcquiredTime.toISOString(),
lastRunSyncIntegrationTimestamp
);
if (timeDifferenceWithLastIntegration < INTEGRATION_INTERVAL && timeDifferenceWithLastIntegration > 0)
await new Promise((resolve) => {
setTimeout(resolve, 2000 - timeDifferenceWithLastIntegration * 1000);
});
}
// akhilmhdh: this try catch is for lock release
try {
const secrets = shouldUseSecretV2Bridge
? await getIntegrationSecretsV2({
environment,
projectId,
folderId: folder.id,
depth: 1,
decryptor: (value) => (value ? secretManagerDecryptor({ cipherTextBlob: value }).toString() : "")
})
: await getIntegrationSecrets({
environment,
projectId,
folderId: folder.id,
key: botKey as string,
depth: 1
});
for (const integration of toBeSyncedIntegrations) {
const integrationAuth = {
...integration.integrationAuth,
createdAt: new Date(),
updatedAt: new Date(),
projectId: integration.projectId
};
const { accessToken, accessId } = await integrationAuthService.getIntegrationAccessToken(
integrationAuth,
shouldUseSecretV2Bridge,
botKey
);
let awsAssumeRoleArn = null;
if (shouldUseSecretV2Bridge) {
if (integrationAuth.encryptedAwsAssumeIamRoleArn) {
awsAssumeRoleArn = secretManagerDecryptor({
cipherTextBlob: Buffer.from(integrationAuth.encryptedAwsAssumeIamRoleArn)
}).toString();
}
} else if (
integrationAuth.awsAssumeIamRoleArnTag &&
integrationAuth.awsAssumeIamRoleArnIV &&
integrationAuth.awsAssumeIamRoleArnCipherText
) {
awsAssumeRoleArn = decryptSymmetric128BitHexKeyUTF8({
ciphertext: integrationAuth.awsAssumeIamRoleArnCipherText,
iv: integrationAuth.awsAssumeIamRoleArnIV,
tag: integrationAuth.awsAssumeIamRoleArnTag,
key: botKey as string
});
}
const suffixedSecrets: typeof secrets = {};
const metadata = integration.metadata as Record<string, string>;
if (metadata) {
Object.keys(secrets).forEach((key) => {
const prefix = metadata?.secretPrefix || "";
const suffix = metadata?.secretSuffix || "";
const newKey = prefix + key + suffix;
suffixedSecrets[newKey] = secrets[key];
});
}
// akhilmhdh: this try catch is for catching integration error and saving it in db
try {
// akhilmhdh: this needs to changed later to be more easier to use
// at present this is not at all extendable like to add a new parameter for just one integration need to modify multiple places
const response = await syncIntegrationSecrets({
createManySecretsRawFn,
updateManySecretsRawFn,
integrationDAL,
integration,
integrationAuth,
secrets: Object.keys(suffixedSecrets).length !== 0 ? suffixedSecrets : secrets,
accessId: accessId as string,
awsAssumeRoleArn,
accessToken,
projectId,
appendices: {
prefix: metadata?.secretPrefix || "",
suffix: metadata?.secretSuffix || ""
}
});
await integrationDAL.updateById(integration.id, {
lastSyncJobId: job.id,
lastUsed: new Date(),
syncMessage: response?.syncMessage ?? "",
isSynced: response?.isSynced ?? true
});
} catch (err) {
logger.error(
err,
`Secret integration sync error [projectId=${job.data.projectId}] [environment=${job.data.environment}] [secretPath=${job.data.secretPath}]`
);
const message =
(err instanceof AxiosError ? JSON.stringify(err?.response?.data) : (err as Error)?.message) ||
"Unknown error occurred.";
await integrationDAL.updateById(integration.id, {
lastSyncJobId: job.id,
lastUsed: new Date(),
syncMessage: message,
isSynced: false
});
}
}
} finally {
await lock.release();
}
await keyStore.setItemWithExpiry(
KeyStorePrefixes.SyncSecretIntegrationLastRunTimestamp(projectId, environment, secretPath),
KeyStoreTtls.SetSyncSecretIntegrationLastRunTimestampInSeconds,
lockAcquiredTime.toISOString()
);
logger.info("Secret integration sync ended: %s", job.id);
});