Compare commits

...

3 Commits

Author SHA1 Message Date
2f0a247c11 Describe query 2025-07-14 18:01:35 -04:00
513f942aae Add batching to not lock DB 2025-07-14 00:39:34 -04:00
218408493a Optimize token cleanup job 2025-07-11 22:05:32 -04:00

View File

@ -30,10 +30,17 @@ export const identityAccessTokenDALFactory = (db: TDbClient) => {
const removeExpiredTokens = async (tx?: Knex) => {
logger.info(`${QueueName.DailyResourceCleanUp}: remove expired access token started`);
const BATCH_SIZE = 10000;
const MAX_RETRY_ON_FAILURE = 3;
const QUERY_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes
const MAX_TTL = 315_360_000; // Maximum TTL value in seconds (10 years)
try {
const docs = (tx || db)(TableName.IdentityAccessToken)
let deletedTokenIds: { id: string }[] = [];
let numberOfRetryOnFailure = 0;
let isRetrying = false;
const getExpiredTokensQuery = (dbClient: Knex | Knex.Transaction) =>
dbClient(TableName.IdentityAccessToken)
.where({
isAccessTokenRevoked: true
})
@ -47,34 +54,64 @@ export const identityAccessTokenDALFactory = (db: TDbClient) => {
);
})
.orWhere((qb) => {
void qb.where("accessTokenTTL", ">", 0).andWhere((qb2) => {
void qb2
.where((qb3) => {
void qb3
.whereNotNull("accessTokenLastRenewedAt")
// accessTokenLastRenewedAt + convert_integer_to_seconds(accessTokenTTL) < present_date
.andWhereRaw(
`"${TableName.IdentityAccessToken}"."accessTokenLastRenewedAt" + make_interval(secs => LEAST("${TableName.IdentityAccessToken}"."accessTokenTTL", ?)) < NOW()`,
[MAX_TTL]
);
})
.orWhere((qb3) => {
void qb3
.whereNull("accessTokenLastRenewedAt")
// created + convert_integer_to_seconds(accessTokenTTL) < present_date
.andWhereRaw(
`"${TableName.IdentityAccessToken}"."createdAt" + make_interval(secs => LEAST("${TableName.IdentityAccessToken}"."accessTokenTTL", ?)) < NOW()`,
[MAX_TTL]
);
});
void qb.where("accessTokenTTL", ">", 0).andWhereRaw(
`
-- Check if the token's effective expiration time has passed.
-- The expiration time is calculated by adding its TTL to its last renewal/creation time.
COALESCE(
"${TableName.IdentityAccessToken}"."accessTokenLastRenewedAt", -- Use last renewal time if available
"${TableName.IdentityAccessToken}"."createdAt" -- Otherwise, use creation time
)
+ make_interval(
secs => LEAST(
"${TableName.IdentityAccessToken}"."accessTokenTTL", -- Token's specified TTL
? -- Capped by MAX_TTL (parameterized value)
)
)
< NOW() -- Check if the calculated time is before now
`,
[MAX_TTL]
);
});
do {
try {
const deleteBatch = async (dbClient: Knex | Knex.Transaction) => {
const idsToDeleteQuery = getExpiredTokensQuery(dbClient).select("id").limit(BATCH_SIZE);
return dbClient(TableName.IdentityAccessToken).whereIn("id", idsToDeleteQuery).del().returning("id");
};
if (tx) {
// eslint-disable-next-line no-await-in-loop
deletedTokenIds = await deleteBatch(tx);
} else {
// eslint-disable-next-line no-await-in-loop
deletedTokenIds = await db.transaction(async (trx) => {
await trx.raw(`SET statement_timeout = ${QUERY_TIMEOUT_MS}`);
return deleteBatch(trx);
});
})
.delete();
await docs;
logger.info(`${QueueName.DailyResourceCleanUp}: remove expired access token completed`);
} catch (error) {
throw new DatabaseError({ error, name: "IdentityAccessTokenPrune" });
}
numberOfRetryOnFailure = 0; // reset
} catch (error) {
numberOfRetryOnFailure += 1;
logger.error(error, "Failed to delete a batch of expired identity access tokens on pruning");
} finally {
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, 10); // time to breathe for db
});
}
isRetrying = numberOfRetryOnFailure > 0;
} while (deletedTokenIds.length > 0 || (isRetrying && numberOfRetryOnFailure < MAX_RETRY_ON_FAILURE));
if (numberOfRetryOnFailure >= MAX_RETRY_ON_FAILURE) {
logger.error(
`IdentityAccessTokenPrune: Pruning failed and stopped after ${MAX_RETRY_ON_FAILURE} consecutive retries.`
);
}
logger.info(`${QueueName.DailyResourceCleanUp}: remove expired access token completed`);
};
return { ...identityAccessTokenOrm, findOne, removeExpiredTokens };