Compare commits

...

1 Commits

Author SHA1 Message Date
Scott Wilson
fbb243b2a0 improvement: limit app connection concurrent syncs 2025-09-04 16:52:28 -07:00
5 changed files with 104 additions and 39 deletions

View File

@@ -38,6 +38,7 @@ export const KeyStorePrefixes = {
SyncSecretIntegrationLastRunTimestamp: (projectId: string, environmentSlug: string, secretPath: string) =>
`sync-integration-last-run-${projectId}-${environmentSlug}-${secretPath}` as const,
SecretSyncLock: (syncId: string) => `secret-sync-mutex-${syncId}` as const,
AppConnectionConcurrentJobs: (connectionId: string) => `app-connection-concurrency-${connectionId}` as const,
SecretRotationLock: (rotationId: string) => `secret-rotation-v2-mutex-${rotationId}` as const,
SecretScanningLock: (dataSourceId: string, resourceExternalId: string) =>
`secret-scanning-v2-mutex-${dataSourceId}-${resourceExternalId}` as const,

View File

@@ -12,7 +12,7 @@ type TAWSParameterStoreRecord = Record<string, AWS.SSM.Parameter>;
type TAWSParameterStoreMetadataRecord = Record<string, AWS.SSM.ParameterMetadata>;
type TAWSParameterStoreTagsRecord = Record<string, Record<string, string>>;
const MAX_RETRIES = 5;
const MAX_RETRIES = 10;
const BATCH_SIZE = 10;
const getSSM = async (secretSync: TAwsParameterStoreSyncWithCredentials) => {

View File

@@ -38,7 +38,7 @@ type TAwsSecretsRecord = Record<string, SecretListEntry>;
type TAwsSecretValuesRecord = Record<string, SecretValueEntry>;
type TAwsSecretDescriptionsRecord = Record<string, DescribeSecretResponse>;
const MAX_RETRIES = 5;
const MAX_RETRIES = 10;
const BATCH_SIZE = 20;
const getSecretsManagerClient = async (secretSync: TAwsSecretsManagerSyncWithCredentials) => {

View File

@@ -81,7 +81,7 @@ type TSecretSyncQueueFactoryDep = {
| "invalidateSecretCacheByProjectId"
>;
secretImportDAL: Pick<TSecretImportDALFactory, "find" | "findByFolderIds">;
secretSyncDAL: Pick<TSecretSyncDALFactory, "findById" | "find" | "updateById" | "deleteById">;
secretSyncDAL: Pick<TSecretSyncDALFactory, "findById" | "find" | "updateById" | "deleteById" | "update">;
auditLogService: Pick<TAuditLogServiceFactory, "createAuditLog">;
projectMembershipDAL: Pick<TProjectMembershipDALFactory, "findAllProjectMembers">;
projectDAL: TProjectDALFactory;
@@ -104,17 +104,15 @@ type SecretSyncActionJob = Job<
TQueueSecretSyncSyncSecretsByIdDTO | TQueueSecretSyncImportSecretsByIdDTO | TQueueSecretSyncRemoveSecretsByIdDTO
>;
const JITTER_MS = 10 * 1000;
const REQUEUE_MS = 30 * 1000;
const REQUEUE_LIMIT = 30;
const CONNECTION_CONCURRENCY_LIMIT = 3;
const getRequeueDelay = (failureCount?: number) => {
if (!failureCount) return 0;
const baseDelay = 1000;
const maxDelay = 30000;
const delay = Math.min(baseDelay * 2 ** failureCount, maxDelay);
const jitter = delay * (0.5 + Math.random() * 0.5);
return jitter;
const jitter = Math.random() * JITTER_MS;
if (!failureCount) return jitter;
return REQUEUE_MS + jitter;
};
export const secretSyncQueueFactory = ({
@@ -193,6 +191,46 @@ export const secretSyncQueueFactory = ({
folderCommitService
});
const $isConnectionConcurrencyLimitReached = async (connectionId: string) => {
const concurrencyCount = await keyStore.getItem(KeyStorePrefixes.AppConnectionConcurrentJobs(connectionId));
if (!concurrencyCount) return false;
const count = Number.parseInt(concurrencyCount, 10);
if (Number.isNaN(count)) return false;
return count >= CONNECTION_CONCURRENCY_LIMIT;
};
const $incrementConnectionConcurrencyCount = async (connectionId: string) => {
const concurrencyCount = await keyStore.getItem(KeyStorePrefixes.AppConnectionConcurrentJobs(connectionId));
const currentCount = Number.parseInt(concurrencyCount || "0", 10);
const incrementedCount = Number.isNaN(currentCount) ? 1 : currentCount + 1;
await keyStore.setItemWithExpiry(
KeyStorePrefixes.AppConnectionConcurrentJobs(connectionId),
(REQUEUE_MS * REQUEUE_LIMIT) / 1000, // in seconds
incrementedCount
);
};
const $decrementConnectionConcurrencyCount = async (connectionId: string) => {
const concurrencyCount = await keyStore.getItem(KeyStorePrefixes.AppConnectionConcurrentJobs(connectionId));
const currentCount = Number.parseInt(concurrencyCount || "0", 10);
const decrementedCount = Math.max(0, Number.isNaN(currentCount) ? 0 : currentCount - 1);
await keyStore.setItemWithExpiry(
KeyStorePrefixes.AppConnectionConcurrentJobs(connectionId),
(REQUEUE_MS * REQUEUE_LIMIT) / 1000, // in seconds
decrementedCount
);
};
const $getInfisicalSecrets = async (
secretSync: TSecretSyncRaw | TSecretSyncWithCredentials,
includeImports = true
@@ -416,15 +454,11 @@ export const secretSyncQueueFactory = ({
return importedSecretMap;
};
const $handleSyncSecretsJob = async (job: TSecretSyncSyncSecretsDTO) => {
const $handleSyncSecretsJob = async (job: TSecretSyncSyncSecretsDTO, secretSync: TSecretSyncRaw) => {
const {
data: { syncId, auditLogInfo }
} = job;
const secretSync = await secretSyncDAL.findById(syncId);
if (!secretSync) throw new Error(`Cannot find secret sync with ID ${syncId}`);
await enterpriseSyncCheck(
licenseService,
secretSync.destination as SecretSync,
@@ -566,15 +600,11 @@ export const secretSyncQueueFactory = ({
logger.info("SecretSync Sync Job with ID %s Completed", job.id);
};
const $handleImportSecretsJob = async (job: TSecretSyncImportSecretsDTO) => {
const $handleImportSecretsJob = async (job: TSecretSyncImportSecretsDTO, secretSync: TSecretSyncRaw) => {
const {
data: { syncId, auditLogInfo, importBehavior }
} = job;
const secretSync = await secretSyncDAL.findById(syncId);
if (!secretSync) throw new Error(`Cannot find secret sync with ID ${syncId}`);
await secretSyncDAL.updateById(syncId, {
importStatus: SecretSyncStatus.Running
});
@@ -683,15 +713,11 @@ export const secretSyncQueueFactory = ({
logger.info("SecretSync Import Job with ID %s Completed", job.id);
};
const $handleRemoveSecretsJob = async (job: TSecretSyncRemoveSecretsDTO) => {
const $handleRemoveSecretsJob = async (job: TSecretSyncRemoveSecretsDTO, secretSync: TSecretSyncRaw) => {
const {
data: { syncId, auditLogInfo, deleteSyncOnComplete }
} = job;
const secretSync = await secretSyncDAL.findById(syncId);
if (!secretSync) throw new Error(`Cannot find secret sync with ID ${syncId}`);
await enterpriseSyncCheck(
licenseService,
secretSync.destination as SecretSync,
@@ -894,6 +920,17 @@ export const secretSyncQueueFactory = ({
const secretSyncs = await secretSyncDAL.find({ folderId: folder.id, isAutoSyncEnabled: true });
await secretSyncDAL.update(
{
$in: {
id: secretSyncs.map((sync) => sync.id)
}
},
{
syncStatus: SecretSyncStatus.Pending
}
);
await Promise.all(secretSyncs.map((secretSync) => queueSecretSyncSyncSecretsById({ syncId: secretSync.id })));
};
@@ -904,7 +941,7 @@ export const secretSyncQueueFactory = ({
case QueueJobs.SecretSyncSyncSecrets: {
const { failedToAcquireLockCount = 0, ...rest } = job.data as TQueueSecretSyncSyncSecretsByIdDTO;
if (failedToAcquireLockCount < 10) {
if (failedToAcquireLockCount < REQUEUE_LIMIT) {
await queueSecretSyncSyncSecretsById({ ...rest, failedToAcquireLockCount: failedToAcquireLockCount + 1 });
return;
}
@@ -974,6 +1011,26 @@ export const secretSyncQueueFactory = ({
| TQueueSecretSyncImportSecretsByIdDTO
| TQueueSecretSyncRemoveSecretsByIdDTO;
const secretSync = await secretSyncDAL.findById(syncId);
if (!secretSync) throw new Error(`Cannot find secret sync with ID ${syncId}`);
const { connectionId } = secretSync;
if (job.name === QueueJobs.SecretSyncSyncSecrets) {
const isConcurrentLimitReached = await $isConnectionConcurrencyLimitReached(connectionId);
if (isConcurrentLimitReached) {
logger.info(
`SecretSync Concurrency limit reached [syncId=${syncId}] [job=${job.name}] [connectionId=${connectionId}]`
);
await $handleAcquireLockFailure(job as SecretSyncActionJob);
return;
}
}
let lock: Awaited<ReturnType<typeof keyStore.acquireLock>>;
try {
@@ -993,20 +1050,26 @@ export const secretSyncQueueFactory = ({
try {
switch (job.name) {
case QueueJobs.SecretSyncSyncSecrets:
await $handleSyncSecretsJob(job as TSecretSyncSyncSecretsDTO);
case QueueJobs.SecretSyncSyncSecrets: {
await $incrementConnectionConcurrencyCount(connectionId);
await $handleSyncSecretsJob(job as TSecretSyncSyncSecretsDTO, secretSync);
break;
}
case QueueJobs.SecretSyncImportSecrets:
await $handleImportSecretsJob(job as TSecretSyncImportSecretsDTO);
await $handleImportSecretsJob(job as TSecretSyncImportSecretsDTO, secretSync);
break;
case QueueJobs.SecretSyncRemoveSecrets:
await $handleRemoveSecretsJob(job as TSecretSyncRemoveSecretsDTO);
await $handleRemoveSecretsJob(job as TSecretSyncRemoveSecretsDTO, secretSync);
break;
default:
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
throw new Error(`Unhandled Secret Sync Job ${job.name}`);
}
} finally {
if (job.name === QueueJobs.SecretSyncSyncSecrets) {
await $decrementConnectionConcurrencyCount(connectionId);
}
await lock.release();
}
});

View File

@@ -1,6 +1,7 @@
import {
faCheck,
faExclamationTriangle,
faHourglass,
faRotate,
IconDefinition
} from "@fortawesome/free-solid-svg-icons";
@@ -29,7 +30,11 @@ export const SecretSyncStatusBadge = ({ status }: Props) => {
text = "Synced";
icon = faCheck;
break;
case SecretSyncStatus.Pending: // no need to differentiate from user perspective
case SecretSyncStatus.Pending:
variant = "primary";
text = "Queued";
icon = faHourglass;
break;
case SecretSyncStatus.Running:
default:
variant = "primary";
@@ -42,11 +47,7 @@ export const SecretSyncStatusBadge = ({ status }: Props) => {
<Badge className="flex h-5 w-min items-center gap-1.5 whitespace-nowrap" variant={variant}>
<FontAwesomeIcon
icon={icon}
className={
[SecretSyncStatus.Pending, SecretSyncStatus.Running].includes(status)
? "animate-spin"
: ""
}
className={[SecretSyncStatus.Running].includes(status) ? "animate-spin" : ""}
/>
<span>{text}</span>
</Badge>