1
0
mirror of https://github.com/Infisical/infisical.git synced 2025-03-31 22:09:57 +00:00

Compare commits

...

8 Commits

Author SHA1 Message Date
4b3de83c07 Feat: SSE Router 2024-02-29 00:18:13 +01:00
804e4c4609 Draft: Test event 2024-02-29 00:17:56 +01:00
b124627288 Fix: SSE connection buffer issue 2024-02-29 00:17:35 +01:00
5c86212b11 Feat: Event service 2024-02-29 00:17:11 +01:00
98fcffe718 Feat: SSE plugin 2024-02-29 00:16:42 +01:00
d84d2ec7a6 Feat: Server-side events (testing) 2024-02-28 06:53:03 +01:00
6780bfb821 Feat: Server-side events 2024-02-28 05:32:37 +01:00
5c94a44e92 Feat: Events 2024-02-27 05:32:48 +01:00
12 changed files with 501 additions and 2 deletions

@ -62,6 +62,7 @@
"tweetnacl": "^1.0.3",
"tweetnacl-util": "^0.15.1",
"uuid": "^9.0.1",
"ws": "^8.16.0",
"zod": "^3.22.4",
"zod-to-json-schema": "^3.22.4"
},
@ -81,6 +82,7 @@
"@types/prompt-sync": "^4.2.3",
"@types/resolve": "^1.20.6",
"@types/uuid": "^9.0.7",
"@types/ws": "^8.5.10",
"@typescript-eslint/eslint-plugin": "^6.20.0",
"@typescript-eslint/parser": "^6.20.0",
"eslint": "^8.56.0",
@ -4211,6 +4213,15 @@
"integrity": "sha512-WUtIVRUZ9i5dYXefDEAI7sh9/O7jGvHg7Df/5O/gtH3Yabe5odI3UWopVR1qbPXQtvOxWu3mM4XxlYeZtMWF4g==",
"dev": true
},
"node_modules/@types/ws": {
"version": "8.5.10",
"resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.10.tgz",
"integrity": "sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==",
"dev": true,
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/xml-crypto": {
"version": "1.4.6",
"resolved": "https://registry.npmjs.org/@types/xml-crypto/-/xml-crypto-1.4.6.tgz",
@ -13720,6 +13731,26 @@
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
"integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ=="
},
"node_modules/ws": {
"version": "8.16.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz",
"integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/xml-crypto": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/xml-crypto/-/xml-crypto-3.2.0.tgz",

@ -46,6 +46,7 @@
"@types/prompt-sync": "^4.2.3",
"@types/resolve": "^1.20.6",
"@types/uuid": "^9.0.7",
"@types/ws": "^8.5.10",
"@typescript-eslint/eslint-plugin": "^6.20.0",
"@typescript-eslint/parser": "^6.20.0",
"eslint": "^8.56.0",
@ -123,6 +124,7 @@
"tweetnacl": "^1.0.3",
"tweetnacl-util": "^0.15.1",
"uuid": "^9.0.1",
"ws": "^8.16.0",
"zod": "^3.22.4",
"zod-to-json-schema": "^3.22.4"
}

@ -20,6 +20,7 @@ import { TAuthPasswordFactory } from "@app/services/auth/auth-password-service";
import { TAuthSignupFactory } from "@app/services/auth/auth-signup-service";
import { ActorType } from "@app/services/auth/auth-type";
import { TAuthTokenServiceFactory } from "@app/services/auth-token/auth-token-service";
import { TEventServiceFactory } from "@app/services/event/event-service";
import { TIdentityServiceFactory } from "@app/services/identity/identity-service";
import { TIdentityAccessTokenServiceFactory } from "@app/services/identity-access-token/identity-access-token-service";
import { TIdentityProjectServiceFactory } from "@app/services/identity-project/identity-project-service";
@ -71,6 +72,21 @@ declare module "fastify" {
ssoConfig: Awaited<ReturnType<TSamlConfigServiceFactory["getSaml"]>>;
}
interface FastifyReply {
sse: ({
data,
error
}:
| {
data: string;
error?: false;
}
| {
error: true;
errorMessage: string;
}) => void;
}
interface FastifyInstance {
services: {
login: TAuthLoginFactory;
@ -113,6 +129,7 @@ declare module "fastify" {
trustedIp: TTrustedIpServiceFactory;
secretBlindIndex: TSecretBlindIndexServiceFactory;
telemetry: TTelemetryServiceFactory;
event: TEventServiceFactory;
};
// this is exclusive use for middlewares in which we need to inject data
// everywhere else access using service layer

188
backend/src/event/event.ts Normal file

@ -0,0 +1,188 @@
import { URL } from "node:url";
import { ForbiddenError } from "@casl/ability";
import jwt from "jsonwebtoken";
import { Server as WebSocketServer, ServerOptions, WebSocket } from "ws";
import { TableName } from "@app/db/schemas";
import { TPermissionServiceFactory } from "@app/ee/services/permission/permission-service";
import { ProjectPermissionActions, ProjectPermissionSub } from "@app/ee/services/permission/project-permission";
import { getConfig } from "@app/lib/config/env";
import { checkIPAgainstBlocklist, TIp } from "@app/lib/ip";
import { ActorType, AuthTokenType } from "@app/services/auth/auth-type";
import { TIdentityAccessTokenDALFactory } from "@app/services/identity-access-token/identity-access-token-dal";
import { TIdentityAccessTokenServiceFactory } from "@app/services/identity-access-token/identity-access-token-service";
import { TIdentityAccessTokenJwtPayload } from "@app/services/identity-access-token/identity-access-token-types";
type TEventSubscriptionFactoryDep = {
identityAccessTokenDAL: TIdentityAccessTokenDALFactory;
identityAccessTokenServiceFactory: TIdentityAccessTokenServiceFactory;
permissionService: TPermissionServiceFactory;
};
enum AuthenticationErrors {
NO_PROJECT_ID = "Unauthorized. Project ID is missing",
NO_MACHINE = "Unauthorized. Machine Identity Access Token is missing",
INVALID_TOKEN_TYPE = "Unauthorized. Invalid token type",
INVALID_TOKEN = "Unauthorized. Invalid token",
NO_PERMISSION = "Unauthorized. No permission to access project"
}
export type TEventSubscriptionFactory = ReturnType<typeof eventSubscriptionFactory>;
export const eventSubscriptionFactory = ({
identityAccessTokenDAL,
permissionService,
identityAccessTokenServiceFactory
}: TEventSubscriptionFactoryDep) => {
const config = getConfig();
let connection: WebSocketServer | null = null;
const clients = new Map<string, WebSocket[]>();
const verifyConnection: ServerOptions["verifyClient"] = (info, cb) => {
void (async () => {
const machineIdentityAccessToken = info.req.headers["machine-identity-access-token"];
const projectId = info.req.headers["project-id"];
if (!projectId || typeof projectId !== "string") {
cb(false, 401, AuthenticationErrors.NO_PROJECT_ID);
return;
}
if (!machineIdentityAccessToken || typeof machineIdentityAccessToken !== "string") {
cb(false, 401, AuthenticationErrors.NO_MACHINE);
return;
}
const decodedToken = jwt.verify(machineIdentityAccessToken, config.AUTH_SECRET) as TIdentityAccessTokenJwtPayload;
if (decodedToken.authTokenType !== AuthTokenType.IDENTITY_ACCESS_TOKEN) {
cb(false, 401, AuthenticationErrors.INVALID_TOKEN_TYPE);
return;
}
await identityAccessTokenServiceFactory.fnValidateIdentityAccessToken(
decodedToken,
info.req.socket.remoteAddress
);
const identityAccessToken = await identityAccessTokenDAL.findOne({
[`${TableName.IdentityAccessToken}.id` as "id"]: decodedToken.identityAccessTokenId,
isAccessTokenRevoked: false
});
if (!identityAccessToken) {
cb(false, 401, AuthenticationErrors.INVALID_TOKEN);
return;
}
const ipAddress = info.req.socket.remoteAddress;
if (ipAddress) {
// This throws, and im not sure if it really should. TODO
checkIPAgainstBlocklist({
ipAddress,
trustedIps: identityAccessToken?.accessTokenTrustedIps as TIp[]
});
}
const { permission } = await permissionService.getProjectPermission(
ActorType.IDENTITY,
identityAccessToken.identityId,
projectId
);
try {
ForbiddenError.from(permission).throwUnlessCan(ProjectPermissionActions.Read, ProjectPermissionSub.Secrets);
} catch (err) {
cb(false, 401, AuthenticationErrors.NO_PERMISSION);
return;
}
cb(true);
})();
};
const init = () => {
if (connection) return;
connection = new WebSocketServer({
port: 8091,
verifyClient: verifyConnection
});
// Purely for testing purposes.
connection.on("connection", (ws) => {
const projectId = new URL(ws.url).searchParams.get("projectId");
if (!projectId) {
ws.send("Unauthorized. Project ID is missing");
ws.close();
return;
}
if (!clients.has(projectId)) {
clients.set(projectId, []);
}
clients.get(projectId)?.push(ws);
ws.on("message", (message) => {
console.log("received: %s", message);
});
ws.on("close", () => {
const projectClients = clients.get(projectId);
if (!projectClients) return;
const index = projectClients.indexOf(ws);
if (index !== -1) {
projectClients.splice(index, 1);
}
if (projectClients.length === 0) {
clients.delete(projectId);
} else {
clients.set(projectId, projectClients);
}
});
ws.send("Connected.");
});
};
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const sendNotification = (projectId: string) => {
const MESSAGE = "NEW_CHANGE";
if (!connection) {
throw new Error("Connection not initialized");
}
for (const client of connection.clients) {
client.send(MESSAGE);
}
};
return {
init
};
};
// var WebSocketServer = require("ws").Server;
// var ws = new WebSocketServer({
// verifyClient: function (info, cb) {
// var token = info.req.headers.token;
// if (!token) cb(false, 401, "Unauthorized");
// else {
// jwt.verify(token, "secret-key", function (err, decoded) {
// if (err) {
// cb(false, 401, "Unauthorized");
// } else {
// info.req.user = decoded; //[1]
// cb(true);
// }
// });
// }
// }
// });

@ -0,0 +1,47 @@
import { FastifyPluginAsync, FastifyReply } from "fastify";
import fp from "fastify-plugin";
// eslint-disable-next-line @typescript-eslint/require-await
export const serversideEvents: FastifyPluginAsync = fp(async function serversideEventsPlgin(instance): Promise<void> {
instance.decorateReply("sse", function handler(this: FastifyReply, input): void {
// if this already set, it's not first event
if (!this.raw.headersSent) {
console.log("Setting headers");
Object.entries(this.getHeaders()).forEach(([key, value]) => {
this.raw.setHeader(key, value ?? "");
});
this.raw.setHeader("Cache-Control", "no-cache");
this.raw.setHeader("Content-Type", "text/event-stream");
this.raw.setHeader("Access-Control-Allow-Origin", "*");
this.raw.setHeader("Connection", "keep-alive");
this.raw.flushHeaders(); // flush the headers to establish SSE with client
// Ngnix will close idle connections even if the connection is keep-alive. So we send a ping every 15 seconds to keep the connection truly alive.
const interval = setInterval(() => {
console.log("Sending ping");
if (!this.raw.writableEnded) {
this.raw.write("event: ping\n");
this.raw.write("data: Heartbeat\n\n");
}
}, 15_000);
this.raw.on("close", () => {
console.log("Connection closed");
clearInterval(interval);
this.raw.end();
});
}
if (input.error) {
this.raw.write("event: error\n");
this.raw.write(
`data: ${JSON.stringify({
error: input.errorMessage
})}\n\n`
);
this.raw.end();
return;
}
this.raw.write(`data: ${input.data}\n\n`); // res.write() instead of res.send()
});
});

@ -36,6 +36,7 @@ import { trustedIpDALFactory } from "@app/ee/services/trusted-ip/trusted-ip-dal"
import { trustedIpServiceFactory } from "@app/ee/services/trusted-ip/trusted-ip-service";
import { getConfig } from "@app/lib/config/env";
import { TQueueServiceFactory } from "@app/queue";
import { serversideEvents } from "@app/server/plugins/sse";
import { apiKeyDALFactory } from "@app/services/api-key/api-key-dal";
import { apiKeyServiceFactory } from "@app/services/api-key/api-key-service";
import { authDALFactory } from "@app/services/auth/auth-dal";
@ -44,6 +45,7 @@ import { authPaswordServiceFactory } from "@app/services/auth/auth-password-serv
import { authSignupServiceFactory } from "@app/services/auth/auth-signup-service";
import { tokenDALFactory } from "@app/services/auth-token/auth-token-dal";
import { tokenServiceFactory } from "@app/services/auth-token/auth-token-service";
import { eventServiceFactory } from "@app/services/event/event-service";
import { identityDALFactory } from "@app/services/identity/identity-dal";
import { identityOrgDALFactory } from "@app/services/identity/identity-org-dal";
import { identityServiceFactory } from "@app/services/identity/identity-service";
@ -116,6 +118,8 @@ export const registerRoutes = async (
) => {
await server.register(registerSecretScannerGhApp, { prefix: "/ss-webhook" });
const cfg = getConfig();
// db layers
const userDAL = userDALFactory(db);
const authDAL = authDALFactory(db);
@ -490,6 +494,8 @@ export const registerRoutes = async (
licenseService
});
const eventService = eventServiceFactory({ redisUrl: cfg.REDIS_URL });
await superAdminService.initServerCfg();
await auditLogQueue.startAuditLogPruneJob();
// setup the communication with license key server
@ -535,7 +541,8 @@ export const registerRoutes = async (
trustedIp: trustedIpService,
scim: scimService,
secretBlindIndex: secretBlindIndexService,
telemetry: telemetryService
telemetry: telemetryService,
event: eventService
});
server.decorate<FastifyZodProvider["store"]>("store", {
@ -545,6 +552,7 @@ export const registerRoutes = async (
await server.register(injectIdentity, { userDAL, serviceTokenDAL });
await server.register(injectPermission);
await server.register(injectAuditLogInfo);
await server.register(serversideEvents, { prefix: "/api/v1/sse" });
server.route({
url: "/api/status",
@ -562,7 +570,6 @@ export const registerRoutes = async (
}
},
handler: async () => {
const cfg = getConfig();
const serverCfg = await getServerCfg();
return {
date: new Date(),

@ -16,6 +16,7 @@ import { registerProjectRouter } from "./project-router";
import { registerSecretFolderRouter } from "./secret-folder-router";
import { registerSecretImportRouter } from "./secret-import-router";
import { registerSecretTagRouter } from "./secret-tag-router";
import { registerServersideEventsRouter } from "./sse-router";
import { registerSsoRouter } from "./sso-router";
import { registerUserActionRouter } from "./user-action-router";
import { registerUserRouter } from "./user-router";
@ -57,4 +58,5 @@ export const registerV1Routes = async (server: FastifyZodProvider) => {
await server.register(registerIntegrationAuthRouter, { prefix: "/integration-auth" });
await server.register(registerWebhookRouter, { prefix: "/webhooks" });
await server.register(registerIdentityRouter, { prefix: "/identities" });
await server.register(registerServersideEventsRouter, { prefix: "/sse" });
};

@ -0,0 +1,48 @@
import { z } from "zod";
import { logger } from "@app/lib/logger";
export const registerServersideEventsRouter = async (server: FastifyZodProvider) => {
server.route({
method: "GET",
url: "/events/:projectId",
schema: {
params: z.object({
projectId: z.string().trim()
}),
response: {
200: z.string()
}
},
// onRequest: verifyAuth([AuthMode.IDENTITY_ACCESS_TOKEN]),
handler: async (req, res) => {
res.sse({
data: JSON.stringify({
message: "Connected to event stream"
})
});
const subscription = await server.services.event.crateSubscription(req.params.projectId);
// It's OK to create a event listener here, because it's tied to the local subscription instance. So once the function ends, the listener is removed along with the subscription.
// No need to worry about memory leaks!
subscription
.on("message", (channel, message) => {
if (channel === req.params.projectId)
res.sse({
data: JSON.stringify(message)
});
})
.on("error", (error) => {
logger.error(error, "Error in subscription");
res.sse({
error: true,
errorMessage: error.message // ? Should we really return the error message to the client?
});
});
// eslint-disable-next-line @typescript-eslint/return-await, @typescript-eslint/no-misused-promises
req.socket.on("close", async () => await subscription.unsubscribe());
}
});
};

@ -16,6 +16,7 @@ import { getTelemetryDistinctId } from "@app/server/lib/telemetry";
import { getUserAgentType } from "@app/server/plugins/audit-log";
import { verifyAuth } from "@app/server/plugins/auth/verify-auth";
import { ActorType, AuthMode } from "@app/services/auth/auth-type";
import { TEventType } from "@app/services/event/event-types";
import { PostHogEventTypes } from "@app/services/telemetry/telemetry-types";
import { secretRawSchema } from "../sanitizedSchemas";
@ -919,6 +920,15 @@ export const registerSecretRouter = async (server: FastifyZodProvider) => {
newSecretName
});
await server.services.event.publish(req.body.workspaceId, {
type: TEventType.SECRET_UPDATE,
payload: {
secretId: secret.id,
secretKey: req.params.secretName,
secretPath: "test/path"
}
});
await server.services.auditLog.createAuditLog({
projectId: req.body.workspaceId,
...req.auditLogInfo,

@ -0,0 +1,87 @@
/* eslint-disable no-console */
import Redis from "ioredis";
// import { logger } from "@app/lib/logger";
import { TEvent, TEventType } from "./event-types";
type TEventServiceFactoryDep = {
redisUrl: string;
};
export type TEventServiceFactory = ReturnType<typeof eventServiceFactory>;
export const eventServiceFactory = ({ redisUrl }: TEventServiceFactoryDep) => {
const publisher = new Redis(redisUrl, { maxRetriesPerRequest: null });
// Map key: the channel ID.
// connections / total number of connections: We keep track of this to know when to unsubscribe and disconnect the client.
// client / the subscription: We store this so we can use the same connection/subscription for the same channel. We don't want to create a new connection for each subscription, because that would be a waste of resources and become hard to scale.
const redisClients = new Map<
string,
{
client: Redis;
connections: number;
}
>();
// Will this work for vertical scaling? The redisClients
// channel would be the projectId
const publish = async (channel: string, event: TEvent[TEventType]) => {
await publisher.publish(channel, JSON.stringify(event));
};
const crateSubscription = async (channel: string) => {
let subscriber: Redis | null = null;
const existingSubscriber = redisClients.get(channel);
if (existingSubscriber) {
redisClients.set(channel, {
client: existingSubscriber.client,
connections: existingSubscriber.connections + 1
});
subscriber = existingSubscriber.client;
} else {
subscriber = new Redis(redisUrl, { maxRetriesPerRequest: null });
redisClients.set(channel, {
client: subscriber,
connections: 1
});
}
await subscriber.subscribe(channel, (msg) => {
if (msg instanceof Error) {
throw msg;
}
});
return {
on: subscriber.on.bind(subscriber),
unsubscribe: async () => {
const subscriberToRemove = redisClients.get(channel);
if (subscriberToRemove) {
// If there's only 1 connection, we can fully unsubscribe and disconnect the client.
if (subscriberToRemove.connections === 1) {
await subscriberToRemove.client.unsubscribe(`${channel}`);
await subscriberToRemove.client.quit();
redisClients.delete(channel);
} else {
// If there's more than 1 connection, we just decrement the connections count, because there are still other listeners.
redisClients.set(channel, {
client: subscriberToRemove.client,
connections: subscriberToRemove.connections - 1
});
}
}
}
};
};
return {
publish,
crateSubscription
};
};

@ -0,0 +1,35 @@
import { z } from "zod";
export enum TEventType {
SECRET_UPDATE = "secret_update",
SECRET_DELETE = "secret_delete",
SECRET_CREATE = "secret_create"
}
export const EventSchema = z.object({
secret_create: z.object({
payload: z.object({
secretId: z.string(),
secretKey: z.string(),
secretPath: z.string()
}),
type: z.literal("secret_create")
}),
secret_update: z.object({
payload: z.object({
secretId: z.string(),
secretKey: z.string(),
secretPath: z.string()
}),
type: z.literal("secret_update")
}),
secret_delete: z.object({
payload: z.object({
secretId: z.string(),
secretPath: z.string()
}),
type: z.literal("secret_delete")
})
});
export type TEvent = z.infer<typeof EventSchema>;

@ -13,6 +13,31 @@ server {
proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";
}
location /api/v1/sse {
proxy_set_header X-Real-RIP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
proxy_pass http://backend:4000;
proxy_redirect off;
proxy_cookie_path / "/; secure; HttpOnly; SameSite=strict";
# Without proxy buffering turned off, server side events will not work. The requests won't be sent until the request is closed or the buffer is full.
client_max_body_size 0;
proxy_http_version 1.1;
proxy_buffering off;
proxy_request_buffering off;
proxy_read_timeout 24h;
proxy_cache off;
proxy_set_header Connection '';
chunked_transfer_encoding off;
}
location / {
include /etc/nginx/mime.types;