mirror of
https://github.com/Infisical/infisical.git
synced 2025-03-31 22:09:57 +00:00
Compare commits
8 Commits
ssh-certs
...
daniel/eve
Author | SHA1 | Date | |
---|---|---|---|
4b3de83c07 | |||
804e4c4609 | |||
b124627288 | |||
5c86212b11 | |||
98fcffe718 | |||
d84d2ec7a6 | |||
6780bfb821 | |||
5c94a44e92 |
31
backend/package-lock.json
generated
31
backend/package-lock.json
generated
@ -62,6 +62,7 @@
|
||||
"tweetnacl": "^1.0.3",
|
||||
"tweetnacl-util": "^0.15.1",
|
||||
"uuid": "^9.0.1",
|
||||
"ws": "^8.16.0",
|
||||
"zod": "^3.22.4",
|
||||
"zod-to-json-schema": "^3.22.4"
|
||||
},
|
||||
@ -81,6 +82,7 @@
|
||||
"@types/prompt-sync": "^4.2.3",
|
||||
"@types/resolve": "^1.20.6",
|
||||
"@types/uuid": "^9.0.7",
|
||||
"@types/ws": "^8.5.10",
|
||||
"@typescript-eslint/eslint-plugin": "^6.20.0",
|
||||
"@typescript-eslint/parser": "^6.20.0",
|
||||
"eslint": "^8.56.0",
|
||||
@ -4211,6 +4213,15 @@
|
||||
"integrity": "sha512-WUtIVRUZ9i5dYXefDEAI7sh9/O7jGvHg7Df/5O/gtH3Yabe5odI3UWopVR1qbPXQtvOxWu3mM4XxlYeZtMWF4g==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/@types/ws": {
|
||||
"version": "8.5.10",
|
||||
"resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.10.tgz",
|
||||
"integrity": "sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"@types/node": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/xml-crypto": {
|
||||
"version": "1.4.6",
|
||||
"resolved": "https://registry.npmjs.org/@types/xml-crypto/-/xml-crypto-1.4.6.tgz",
|
||||
@ -13720,6 +13731,26 @@
|
||||
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
|
||||
"integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="
|
||||
},
|
||||
"node_modules/ws": {
|
||||
"version": "8.16.0",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz",
|
||||
"integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"bufferutil": "^4.0.1",
|
||||
"utf-8-validate": ">=5.0.2"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"bufferutil": {
|
||||
"optional": true
|
||||
},
|
||||
"utf-8-validate": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/xml-crypto": {
|
||||
"version": "3.2.0",
|
||||
"resolved": "https://registry.npmjs.org/xml-crypto/-/xml-crypto-3.2.0.tgz",
|
||||
|
@ -46,6 +46,7 @@
|
||||
"@types/prompt-sync": "^4.2.3",
|
||||
"@types/resolve": "^1.20.6",
|
||||
"@types/uuid": "^9.0.7",
|
||||
"@types/ws": "^8.5.10",
|
||||
"@typescript-eslint/eslint-plugin": "^6.20.0",
|
||||
"@typescript-eslint/parser": "^6.20.0",
|
||||
"eslint": "^8.56.0",
|
||||
@ -123,6 +124,7 @@
|
||||
"tweetnacl": "^1.0.3",
|
||||
"tweetnacl-util": "^0.15.1",
|
||||
"uuid": "^9.0.1",
|
||||
"ws": "^8.16.0",
|
||||
"zod": "^3.22.4",
|
||||
"zod-to-json-schema": "^3.22.4"
|
||||
}
|
||||
|
17
backend/src/@types/fastify.d.ts
vendored
17
backend/src/@types/fastify.d.ts
vendored
@ -20,6 +20,7 @@ import { TAuthPasswordFactory } from "@app/services/auth/auth-password-service";
|
||||
import { TAuthSignupFactory } from "@app/services/auth/auth-signup-service";
|
||||
import { ActorType } from "@app/services/auth/auth-type";
|
||||
import { TAuthTokenServiceFactory } from "@app/services/auth-token/auth-token-service";
|
||||
import { TEventServiceFactory } from "@app/services/event/event-service";
|
||||
import { TIdentityServiceFactory } from "@app/services/identity/identity-service";
|
||||
import { TIdentityAccessTokenServiceFactory } from "@app/services/identity-access-token/identity-access-token-service";
|
||||
import { TIdentityProjectServiceFactory } from "@app/services/identity-project/identity-project-service";
|
||||
@ -71,6 +72,21 @@ declare module "fastify" {
|
||||
ssoConfig: Awaited<ReturnType<TSamlConfigServiceFactory["getSaml"]>>;
|
||||
}
|
||||
|
||||
interface FastifyReply {
|
||||
sse: ({
|
||||
data,
|
||||
error
|
||||
}:
|
||||
| {
|
||||
data: string;
|
||||
error?: false;
|
||||
}
|
||||
| {
|
||||
error: true;
|
||||
errorMessage: string;
|
||||
}) => void;
|
||||
}
|
||||
|
||||
interface FastifyInstance {
|
||||
services: {
|
||||
login: TAuthLoginFactory;
|
||||
@ -113,6 +129,7 @@ declare module "fastify" {
|
||||
trustedIp: TTrustedIpServiceFactory;
|
||||
secretBlindIndex: TSecretBlindIndexServiceFactory;
|
||||
telemetry: TTelemetryServiceFactory;
|
||||
event: TEventServiceFactory;
|
||||
};
|
||||
// this is exclusive use for middlewares in which we need to inject data
|
||||
// everywhere else access using service layer
|
||||
|
188
backend/src/event/event.ts
Normal file
188
backend/src/event/event.ts
Normal file
@ -0,0 +1,188 @@
|
||||
import { URL } from "node:url";
|
||||
|
||||
import { ForbiddenError } from "@casl/ability";
|
||||
import jwt from "jsonwebtoken";
|
||||
import { Server as WebSocketServer, ServerOptions, WebSocket } from "ws";
|
||||
|
||||
import { TableName } from "@app/db/schemas";
|
||||
import { TPermissionServiceFactory } from "@app/ee/services/permission/permission-service";
|
||||
import { ProjectPermissionActions, ProjectPermissionSub } from "@app/ee/services/permission/project-permission";
|
||||
import { getConfig } from "@app/lib/config/env";
|
||||
import { checkIPAgainstBlocklist, TIp } from "@app/lib/ip";
|
||||
import { ActorType, AuthTokenType } from "@app/services/auth/auth-type";
|
||||
import { TIdentityAccessTokenDALFactory } from "@app/services/identity-access-token/identity-access-token-dal";
|
||||
import { TIdentityAccessTokenServiceFactory } from "@app/services/identity-access-token/identity-access-token-service";
|
||||
import { TIdentityAccessTokenJwtPayload } from "@app/services/identity-access-token/identity-access-token-types";
|
||||
|
||||
type TEventSubscriptionFactoryDep = {
|
||||
identityAccessTokenDAL: TIdentityAccessTokenDALFactory;
|
||||
identityAccessTokenServiceFactory: TIdentityAccessTokenServiceFactory;
|
||||
permissionService: TPermissionServiceFactory;
|
||||
};
|
||||
|
||||
enum AuthenticationErrors {
|
||||
NO_PROJECT_ID = "Unauthorized. Project ID is missing",
|
||||
NO_MACHINE = "Unauthorized. Machine Identity Access Token is missing",
|
||||
INVALID_TOKEN_TYPE = "Unauthorized. Invalid token type",
|
||||
INVALID_TOKEN = "Unauthorized. Invalid token",
|
||||
NO_PERMISSION = "Unauthorized. No permission to access project"
|
||||
}
|
||||
|
||||
export type TEventSubscriptionFactory = ReturnType<typeof eventSubscriptionFactory>;
|
||||
|
||||
export const eventSubscriptionFactory = ({
|
||||
identityAccessTokenDAL,
|
||||
permissionService,
|
||||
identityAccessTokenServiceFactory
|
||||
}: TEventSubscriptionFactoryDep) => {
|
||||
const config = getConfig();
|
||||
let connection: WebSocketServer | null = null;
|
||||
const clients = new Map<string, WebSocket[]>();
|
||||
|
||||
const verifyConnection: ServerOptions["verifyClient"] = (info, cb) => {
|
||||
void (async () => {
|
||||
const machineIdentityAccessToken = info.req.headers["machine-identity-access-token"];
|
||||
const projectId = info.req.headers["project-id"];
|
||||
|
||||
if (!projectId || typeof projectId !== "string") {
|
||||
cb(false, 401, AuthenticationErrors.NO_PROJECT_ID);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!machineIdentityAccessToken || typeof machineIdentityAccessToken !== "string") {
|
||||
cb(false, 401, AuthenticationErrors.NO_MACHINE);
|
||||
return;
|
||||
}
|
||||
|
||||
const decodedToken = jwt.verify(machineIdentityAccessToken, config.AUTH_SECRET) as TIdentityAccessTokenJwtPayload;
|
||||
|
||||
if (decodedToken.authTokenType !== AuthTokenType.IDENTITY_ACCESS_TOKEN) {
|
||||
cb(false, 401, AuthenticationErrors.INVALID_TOKEN_TYPE);
|
||||
return;
|
||||
}
|
||||
|
||||
await identityAccessTokenServiceFactory.fnValidateIdentityAccessToken(
|
||||
decodedToken,
|
||||
info.req.socket.remoteAddress
|
||||
);
|
||||
|
||||
const identityAccessToken = await identityAccessTokenDAL.findOne({
|
||||
[`${TableName.IdentityAccessToken}.id` as "id"]: decodedToken.identityAccessTokenId,
|
||||
isAccessTokenRevoked: false
|
||||
});
|
||||
|
||||
if (!identityAccessToken) {
|
||||
cb(false, 401, AuthenticationErrors.INVALID_TOKEN);
|
||||
return;
|
||||
}
|
||||
|
||||
const ipAddress = info.req.socket.remoteAddress;
|
||||
|
||||
if (ipAddress) {
|
||||
// This throws, and im not sure if it really should. TODO
|
||||
checkIPAgainstBlocklist({
|
||||
ipAddress,
|
||||
trustedIps: identityAccessToken?.accessTokenTrustedIps as TIp[]
|
||||
});
|
||||
}
|
||||
|
||||
const { permission } = await permissionService.getProjectPermission(
|
||||
ActorType.IDENTITY,
|
||||
identityAccessToken.identityId,
|
||||
projectId
|
||||
);
|
||||
try {
|
||||
ForbiddenError.from(permission).throwUnlessCan(ProjectPermissionActions.Read, ProjectPermissionSub.Secrets);
|
||||
} catch (err) {
|
||||
cb(false, 401, AuthenticationErrors.NO_PERMISSION);
|
||||
return;
|
||||
}
|
||||
|
||||
cb(true);
|
||||
})();
|
||||
};
|
||||
|
||||
const init = () => {
|
||||
if (connection) return;
|
||||
|
||||
connection = new WebSocketServer({
|
||||
port: 8091,
|
||||
verifyClient: verifyConnection
|
||||
});
|
||||
|
||||
// Purely for testing purposes.
|
||||
connection.on("connection", (ws) => {
|
||||
const projectId = new URL(ws.url).searchParams.get("projectId");
|
||||
|
||||
if (!projectId) {
|
||||
ws.send("Unauthorized. Project ID is missing");
|
||||
ws.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!clients.has(projectId)) {
|
||||
clients.set(projectId, []);
|
||||
}
|
||||
clients.get(projectId)?.push(ws);
|
||||
|
||||
ws.on("message", (message) => {
|
||||
console.log("received: %s", message);
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
const projectClients = clients.get(projectId);
|
||||
|
||||
if (!projectClients) return;
|
||||
|
||||
const index = projectClients.indexOf(ws);
|
||||
|
||||
if (index !== -1) {
|
||||
projectClients.splice(index, 1);
|
||||
}
|
||||
|
||||
if (projectClients.length === 0) {
|
||||
clients.delete(projectId);
|
||||
} else {
|
||||
clients.set(projectId, projectClients);
|
||||
}
|
||||
});
|
||||
|
||||
ws.send("Connected.");
|
||||
});
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
const sendNotification = (projectId: string) => {
|
||||
const MESSAGE = "NEW_CHANGE";
|
||||
|
||||
if (!connection) {
|
||||
throw new Error("Connection not initialized");
|
||||
}
|
||||
|
||||
for (const client of connection.clients) {
|
||||
client.send(MESSAGE);
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
init
|
||||
};
|
||||
};
|
||||
|
||||
// var WebSocketServer = require("ws").Server;
|
||||
// var ws = new WebSocketServer({
|
||||
// verifyClient: function (info, cb) {
|
||||
// var token = info.req.headers.token;
|
||||
// if (!token) cb(false, 401, "Unauthorized");
|
||||
// else {
|
||||
// jwt.verify(token, "secret-key", function (err, decoded) {
|
||||
// if (err) {
|
||||
// cb(false, 401, "Unauthorized");
|
||||
// } else {
|
||||
// info.req.user = decoded; //[1]
|
||||
// cb(true);
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
// });
|
47
backend/src/server/plugins/sse.ts
Normal file
47
backend/src/server/plugins/sse.ts
Normal file
@ -0,0 +1,47 @@
|
||||
import { FastifyPluginAsync, FastifyReply } from "fastify";
|
||||
import fp from "fastify-plugin";
|
||||
// eslint-disable-next-line @typescript-eslint/require-await
|
||||
export const serversideEvents: FastifyPluginAsync = fp(async function serversideEventsPlgin(instance): Promise<void> {
|
||||
instance.decorateReply("sse", function handler(this: FastifyReply, input): void {
|
||||
// if this already set, it's not first event
|
||||
if (!this.raw.headersSent) {
|
||||
console.log("Setting headers");
|
||||
Object.entries(this.getHeaders()).forEach(([key, value]) => {
|
||||
this.raw.setHeader(key, value ?? "");
|
||||
});
|
||||
this.raw.setHeader("Cache-Control", "no-cache");
|
||||
this.raw.setHeader("Content-Type", "text/event-stream");
|
||||
this.raw.setHeader("Access-Control-Allow-Origin", "*");
|
||||
this.raw.setHeader("Connection", "keep-alive");
|
||||
this.raw.flushHeaders(); // flush the headers to establish SSE with client
|
||||
|
||||
// Ngnix will close idle connections even if the connection is keep-alive. So we send a ping every 15 seconds to keep the connection truly alive.
|
||||
const interval = setInterval(() => {
|
||||
console.log("Sending ping");
|
||||
if (!this.raw.writableEnded) {
|
||||
this.raw.write("event: ping\n");
|
||||
this.raw.write("data: Heartbeat\n\n");
|
||||
}
|
||||
}, 15_000);
|
||||
|
||||
this.raw.on("close", () => {
|
||||
console.log("Connection closed");
|
||||
clearInterval(interval);
|
||||
this.raw.end();
|
||||
});
|
||||
}
|
||||
|
||||
if (input.error) {
|
||||
this.raw.write("event: error\n");
|
||||
this.raw.write(
|
||||
`data: ${JSON.stringify({
|
||||
error: input.errorMessage
|
||||
})}\n\n`
|
||||
);
|
||||
this.raw.end();
|
||||
return;
|
||||
}
|
||||
|
||||
this.raw.write(`data: ${input.data}\n\n`); // res.write() instead of res.send()
|
||||
});
|
||||
});
|
@ -36,6 +36,7 @@ import { trustedIpDALFactory } from "@app/ee/services/trusted-ip/trusted-ip-dal"
|
||||
import { trustedIpServiceFactory } from "@app/ee/services/trusted-ip/trusted-ip-service";
|
||||
import { getConfig } from "@app/lib/config/env";
|
||||
import { TQueueServiceFactory } from "@app/queue";
|
||||
import { serversideEvents } from "@app/server/plugins/sse";
|
||||
import { apiKeyDALFactory } from "@app/services/api-key/api-key-dal";
|
||||
import { apiKeyServiceFactory } from "@app/services/api-key/api-key-service";
|
||||
import { authDALFactory } from "@app/services/auth/auth-dal";
|
||||
@ -44,6 +45,7 @@ import { authPaswordServiceFactory } from "@app/services/auth/auth-password-serv
|
||||
import { authSignupServiceFactory } from "@app/services/auth/auth-signup-service";
|
||||
import { tokenDALFactory } from "@app/services/auth-token/auth-token-dal";
|
||||
import { tokenServiceFactory } from "@app/services/auth-token/auth-token-service";
|
||||
import { eventServiceFactory } from "@app/services/event/event-service";
|
||||
import { identityDALFactory } from "@app/services/identity/identity-dal";
|
||||
import { identityOrgDALFactory } from "@app/services/identity/identity-org-dal";
|
||||
import { identityServiceFactory } from "@app/services/identity/identity-service";
|
||||
@ -116,6 +118,8 @@ export const registerRoutes = async (
|
||||
) => {
|
||||
await server.register(registerSecretScannerGhApp, { prefix: "/ss-webhook" });
|
||||
|
||||
const cfg = getConfig();
|
||||
|
||||
// db layers
|
||||
const userDAL = userDALFactory(db);
|
||||
const authDAL = authDALFactory(db);
|
||||
@ -490,6 +494,8 @@ export const registerRoutes = async (
|
||||
licenseService
|
||||
});
|
||||
|
||||
const eventService = eventServiceFactory({ redisUrl: cfg.REDIS_URL });
|
||||
|
||||
await superAdminService.initServerCfg();
|
||||
await auditLogQueue.startAuditLogPruneJob();
|
||||
// setup the communication with license key server
|
||||
@ -535,7 +541,8 @@ export const registerRoutes = async (
|
||||
trustedIp: trustedIpService,
|
||||
scim: scimService,
|
||||
secretBlindIndex: secretBlindIndexService,
|
||||
telemetry: telemetryService
|
||||
telemetry: telemetryService,
|
||||
event: eventService
|
||||
});
|
||||
|
||||
server.decorate<FastifyZodProvider["store"]>("store", {
|
||||
@ -545,6 +552,7 @@ export const registerRoutes = async (
|
||||
await server.register(injectIdentity, { userDAL, serviceTokenDAL });
|
||||
await server.register(injectPermission);
|
||||
await server.register(injectAuditLogInfo);
|
||||
await server.register(serversideEvents, { prefix: "/api/v1/sse" });
|
||||
|
||||
server.route({
|
||||
url: "/api/status",
|
||||
@ -562,7 +570,6 @@ export const registerRoutes = async (
|
||||
}
|
||||
},
|
||||
handler: async () => {
|
||||
const cfg = getConfig();
|
||||
const serverCfg = await getServerCfg();
|
||||
return {
|
||||
date: new Date(),
|
||||
|
@ -16,6 +16,7 @@ import { registerProjectRouter } from "./project-router";
|
||||
import { registerSecretFolderRouter } from "./secret-folder-router";
|
||||
import { registerSecretImportRouter } from "./secret-import-router";
|
||||
import { registerSecretTagRouter } from "./secret-tag-router";
|
||||
import { registerServersideEventsRouter } from "./sse-router";
|
||||
import { registerSsoRouter } from "./sso-router";
|
||||
import { registerUserActionRouter } from "./user-action-router";
|
||||
import { registerUserRouter } from "./user-router";
|
||||
@ -57,4 +58,5 @@ export const registerV1Routes = async (server: FastifyZodProvider) => {
|
||||
await server.register(registerIntegrationAuthRouter, { prefix: "/integration-auth" });
|
||||
await server.register(registerWebhookRouter, { prefix: "/webhooks" });
|
||||
await server.register(registerIdentityRouter, { prefix: "/identities" });
|
||||
await server.register(registerServersideEventsRouter, { prefix: "/sse" });
|
||||
};
|
||||
|
48
backend/src/server/routes/v1/sse-router.ts
Normal file
48
backend/src/server/routes/v1/sse-router.ts
Normal file
@ -0,0 +1,48 @@
|
||||
import { z } from "zod";
|
||||
|
||||
import { logger } from "@app/lib/logger";
|
||||
|
||||
export const registerServersideEventsRouter = async (server: FastifyZodProvider) => {
|
||||
server.route({
|
||||
method: "GET",
|
||||
url: "/events/:projectId",
|
||||
schema: {
|
||||
params: z.object({
|
||||
projectId: z.string().trim()
|
||||
}),
|
||||
response: {
|
||||
200: z.string()
|
||||
}
|
||||
},
|
||||
// onRequest: verifyAuth([AuthMode.IDENTITY_ACCESS_TOKEN]),
|
||||
handler: async (req, res) => {
|
||||
res.sse({
|
||||
data: JSON.stringify({
|
||||
message: "Connected to event stream"
|
||||
})
|
||||
});
|
||||
|
||||
const subscription = await server.services.event.crateSubscription(req.params.projectId);
|
||||
|
||||
// It's OK to create a event listener here, because it's tied to the local subscription instance. So once the function ends, the listener is removed along with the subscription.
|
||||
// No need to worry about memory leaks!
|
||||
subscription
|
||||
.on("message", (channel, message) => {
|
||||
if (channel === req.params.projectId)
|
||||
res.sse({
|
||||
data: JSON.stringify(message)
|
||||
});
|
||||
})
|
||||
.on("error", (error) => {
|
||||
logger.error(error, "Error in subscription");
|
||||
res.sse({
|
||||
error: true,
|
||||
errorMessage: error.message // ? Should we really return the error message to the client?
|
||||
});
|
||||
});
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/return-await, @typescript-eslint/no-misused-promises
|
||||
req.socket.on("close", async () => await subscription.unsubscribe());
|
||||
}
|
||||
});
|
||||
};
|
@ -16,6 +16,7 @@ import { getTelemetryDistinctId } from "@app/server/lib/telemetry";
|
||||
import { getUserAgentType } from "@app/server/plugins/audit-log";
|
||||
import { verifyAuth } from "@app/server/plugins/auth/verify-auth";
|
||||
import { ActorType, AuthMode } from "@app/services/auth/auth-type";
|
||||
import { TEventType } from "@app/services/event/event-types";
|
||||
import { PostHogEventTypes } from "@app/services/telemetry/telemetry-types";
|
||||
|
||||
import { secretRawSchema } from "../sanitizedSchemas";
|
||||
@ -919,6 +920,15 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
|
||||
newSecretName
|
||||
});
|
||||
|
||||
await server.services.event.publish(req.body.workspaceId, {
|
||||
type: TEventType.SECRET_UPDATE,
|
||||
payload: {
|
||||
secretId: secret.id,
|
||||
secretKey: req.params.secretName,
|
||||
secretPath: "test/path"
|
||||
}
|
||||
});
|
||||
|
||||
await server.services.auditLog.createAuditLog({
|
||||
projectId: req.body.workspaceId,
|
||||
...req.auditLogInfo,
|
||||
|
87
backend/src/services/event/event-service.ts
Normal file
87
backend/src/services/event/event-service.ts
Normal file
@ -0,0 +1,87 @@
|
||||
/* eslint-disable no-console */
|
||||
import Redis from "ioredis";
|
||||
|
||||
// import { logger } from "@app/lib/logger";
|
||||
import { TEvent, TEventType } from "./event-types";
|
||||
|
||||
type TEventServiceFactoryDep = {
|
||||
redisUrl: string;
|
||||
};
|
||||
|
||||
export type TEventServiceFactory = ReturnType<typeof eventServiceFactory>;
|
||||
|
||||
export const eventServiceFactory = ({ redisUrl }: TEventServiceFactoryDep) => {
|
||||
const publisher = new Redis(redisUrl, { maxRetriesPerRequest: null });
|
||||
|
||||
// Map key: the channel ID.
|
||||
// connections / total number of connections: We keep track of this to know when to unsubscribe and disconnect the client.
|
||||
// client / the subscription: We store this so we can use the same connection/subscription for the same channel. We don't want to create a new connection for each subscription, because that would be a waste of resources and become hard to scale.
|
||||
const redisClients = new Map<
|
||||
string,
|
||||
{
|
||||
client: Redis;
|
||||
connections: number;
|
||||
}
|
||||
>();
|
||||
// Will this work for vertical scaling? The redisClients
|
||||
|
||||
// channel would be the projectId
|
||||
const publish = async (channel: string, event: TEvent[TEventType]) => {
|
||||
await publisher.publish(channel, JSON.stringify(event));
|
||||
};
|
||||
|
||||
const crateSubscription = async (channel: string) => {
|
||||
let subscriber: Redis | null = null;
|
||||
|
||||
const existingSubscriber = redisClients.get(channel);
|
||||
|
||||
if (existingSubscriber) {
|
||||
redisClients.set(channel, {
|
||||
client: existingSubscriber.client,
|
||||
connections: existingSubscriber.connections + 1
|
||||
});
|
||||
|
||||
subscriber = existingSubscriber.client;
|
||||
} else {
|
||||
subscriber = new Redis(redisUrl, { maxRetriesPerRequest: null });
|
||||
|
||||
redisClients.set(channel, {
|
||||
client: subscriber,
|
||||
connections: 1
|
||||
});
|
||||
}
|
||||
|
||||
await subscriber.subscribe(channel, (msg) => {
|
||||
if (msg instanceof Error) {
|
||||
throw msg;
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
on: subscriber.on.bind(subscriber),
|
||||
unsubscribe: async () => {
|
||||
const subscriberToRemove = redisClients.get(channel);
|
||||
|
||||
if (subscriberToRemove) {
|
||||
// If there's only 1 connection, we can fully unsubscribe and disconnect the client.
|
||||
if (subscriberToRemove.connections === 1) {
|
||||
await subscriberToRemove.client.unsubscribe(`${channel}`);
|
||||
await subscriberToRemove.client.quit();
|
||||
redisClients.delete(channel);
|
||||
} else {
|
||||
// If there's more than 1 connection, we just decrement the connections count, because there are still other listeners.
|
||||
redisClients.set(channel, {
|
||||
client: subscriberToRemove.client,
|
||||
connections: subscriberToRemove.connections - 1
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
return {
|
||||
publish,
|
||||
crateSubscription
|
||||
};
|
||||
};
|
35
backend/src/services/event/event-types.ts
Normal file
35
backend/src/services/event/event-types.ts
Normal file
@ -0,0 +1,35 @@
|
||||
import { z } from "zod";
|
||||
|
||||
export enum TEventType {
|
||||
SECRET_UPDATE = "secret_update",
|
||||
SECRET_DELETE = "secret_delete",
|
||||
SECRET_CREATE = "secret_create"
|
||||
}
|
||||
|
||||
export const EventSchema = z.object({
|
||||
secret_create: z.object({
|
||||
payload: z.object({
|
||||
secretId: z.string(),
|
||||
secretKey: z.string(),
|
||||
secretPath: z.string()
|
||||
}),
|
||||
type: z.literal("secret_create")
|
||||
}),
|
||||
secret_update: z.object({
|
||||
payload: z.object({
|
||||
secretId: z.string(),
|
||||
secretKey: z.string(),
|
||||
secretPath: z.string()
|
||||
}),
|
||||
type: z.literal("secret_update")
|
||||
}),
|
||||
secret_delete: z.object({
|
||||
payload: z.object({
|
||||
secretId: z.string(),
|
||||
secretPath: z.string()
|
||||
}),
|
||||
type: z.literal("secret_delete")
|
||||
})
|
||||
});
|
||||
|
||||
export type TEvent = z.infer<typeof EventSchema>;
|
@ -13,6 +13,31 @@ server {
|
||||
|
||||
proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";
|
||||
}
|
||||
|
||||
location /api/v1/sse {
|
||||
proxy_set_header X-Real-RIP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
|
||||
proxy_set_header Host $http_host;
|
||||
proxy_set_header X-NginX-Proxy true;
|
||||
|
||||
proxy_pass http://backend:4000;
|
||||
proxy_redirect off;
|
||||
|
||||
proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";
|
||||
|
||||
# Without proxy buffering turned off, server side events will not work. The requests won't be sent until the request is closed or the buffer is full.
|
||||
client_max_body_size 0;
|
||||
proxy_http_version 1.1;
|
||||
proxy_buffering off;
|
||||
proxy_request_buffering off;
|
||||
|
||||
proxy_read_timeout 24h;
|
||||
|
||||
proxy_cache off;
|
||||
proxy_set_header Connection '';
|
||||
chunked_transfer_encoding off;
|
||||
}
|
||||
|
||||
location / {
|
||||
include /etc/nginx/mime.types;
|
||||
|
Reference in New Issue
Block a user