From 9440b44f3bb1842d0e6ffc479d340ae9c5b84656 Mon Sep 17 00:00:00 2001 From: Blake Matthes Date: Wed, 10 Jun 2026 16:25:48 -0500 Subject: [PATCH] refactor(socket.io): complete rewrite to manage dynamic rooms and seeding better --- backend/socket.io/roomCache.socket.ts | 8 - backend/socket.io/roomDefinitions.socket.ts | 122 -------------- backend/socket.io/roomEmitter.socket.ts | 68 ++++++-- backend/socket.io/roomService.socket.ts | 90 ----------- backend/socket.io/serverSetup.ts | 171 ++++++++++---------- backend/socket.io/socket.manager.ts | 10 ++ backend/socket.io/socket.roomConfig.ts | 117 ++++++++++++++ 7 files changed, 271 insertions(+), 315 deletions(-) delete mode 100644 backend/socket.io/roomCache.socket.ts delete mode 100644 backend/socket.io/roomDefinitions.socket.ts delete mode 100644 backend/socket.io/roomService.socket.ts create mode 100644 backend/socket.io/socket.manager.ts create mode 100644 backend/socket.io/socket.roomConfig.ts diff --git a/backend/socket.io/roomCache.socket.ts b/backend/socket.io/roomCache.socket.ts deleted file mode 100644 index e3da9b0..0000000 --- a/backend/socket.io/roomCache.socket.ts +++ /dev/null @@ -1,8 +0,0 @@ -import type { RoomId } from "./roomDefinitions.socket.js"; - -export const MAX_HISTORY = 50; -export const FLUSH_INTERVAL = 100; // 50ms change higher if needed - -export const roomHistory = new Map(); -export const roomBuffers = new Map(); -export const roomFlushTimers = new Map(); diff --git a/backend/socket.io/roomDefinitions.socket.ts b/backend/socket.io/roomDefinitions.socket.ts deleted file mode 100644 index 9eae9e5..0000000 --- a/backend/socket.io/roomDefinitions.socket.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { desc, eq } from "drizzle-orm"; -import { db } from "../db/db.controller.js"; -import { dockDoorScans } from "../db/schema/dockdoor.scans.schema.js"; -import { logs } from "../db/schema/logs.schema.js"; -import { ppoRun } from "../warehousing/warehousing.ppooMonitor.js"; - -type RoomDefinition = { - seed: (limit: number) => Promise; -}; - -export type StaticRoomId = - | "logs" - | "labels" - | "admin" - | "admin:build" - | "ppoo" - | "dockDoorLoading:2"; -export type DynamicRoomId = `dockDoorLoading:${string}`; -export type RoomId = StaticRoomId | DynamicRoomId; - -export type RoomConfig = { - requiresAuth?: boolean; - role?: string[]; - seed?: (limit: number, roomId: RoomId) => Promise; -}; - -export const protectedRooms: Record = { - logs: { requiresAuth: true, role: ["admin", "systemAdmin"] }, - //admin: { requiresAuth: false, role: ["admin", "systemAdmin"] }, - labels: {}, - admin: {}, - "admin:build": {}, - ppoo: {}, - "dockDoorLoading:2": {}, -}; - -export function getRoomConfig(roomId: string): RoomConfig | null { - if (roomId in protectedRooms) { - return protectedRooms[roomId as StaticRoomId]; - } - - if (roomId.startsWith("dockDoorLoading:")) { - const dockId = roomId.split(":")[1]; - - if (!dockId) return null; - - return { - requiresAuth: true, - role: ["admin", "systemAdmin", "dockDoor"], - }; - } - - return null; -} - -export const roomDefinition: Record = { - logs: { - seed: async (limit) => { - try { - const rows = await db - .select() - .from(logs) - .orderBy(desc(logs.createdAt)) - .limit(limit); - - return rows; //.reverse(); - } catch (e) { - console.error("Failed to seed logs:", e); - return []; - } - }, - }, - - labels: { - seed: async (limit) => { - console.info(limit); - return []; - }, - }, - admin: { - seed: async (limit) => { - console.info(limit); - return []; - }, - }, - "admin:build": { - seed: async (limit) => { - console.info(limit); - return []; - }, - }, - ppoo: { - seed: async (limit) => { - console.log(limit); - return { - type: "snapshot", - items: await ppoRun(), - createdAt: new Date().toISOString(), - } as any; - }, - }, - - // TODO: add in dynamic room seeding - "dockDoorLoading:2": { - seed: async (limit) => { - console.log(limit); - try { - const rows = await db - .select() - .from(dockDoorScans) - .where(eq(dockDoorScans.status, "active")) - .orderBy(desc(dockDoorScans.upd_date)) - .limit(limit); - - return rows; //.reverse(); - } catch (e) { - console.error("Failed to seed logs:", e); - return []; - } - }, - }, -}; diff --git a/backend/socket.io/roomEmitter.socket.ts b/backend/socket.io/roomEmitter.socket.ts index afdf38a..e80a1da 100644 --- a/backend/socket.io/roomEmitter.socket.ts +++ b/backend/socket.io/roomEmitter.socket.ts @@ -1,27 +1,73 @@ // the emitter setup +// TODO: validate if we want to add event back in later.. +// let emitFn: ((roomId: string, event: string, payload: unknown) => void) | null = +// null; -import type { RoomId } from "./roomDefinitions.socket.js"; +import { createLogger } from "../logger/logger.controller.js"; -let addDataToRoom: ((roomId: RoomId, payload: unknown[]) => void) | null = null; +type QueuedPayload = unknown; + +let emitFn: ((roomId: string, payload: QueuedPayload[]) => void) | null = null; + +const queues = new Map(); +const timers = new Map(); + +const FLUSH_MS = 500; +const MAX_QUEUE_SIZE = 200; export const registerEmitter = ( - fn: (roomId: RoomId, payload: unknown[]) => void, + fn: (roomId: string, payload: QueuedPayload) => void, ) => { - addDataToRoom = fn; + emitFn = fn; }; -export const emitToRoom = (roomId: RoomId, payload: unknown[]) => { - if (!addDataToRoom) { +export const emitToRoom = (roomId: string, payload: QueuedPayload) => { + const log = createLogger({ module: "socket.io", subModule: "emitter" }); + if (!emitFn) { console.error("Socket emitter not initialized"); return; } - addDataToRoom(roomId, payload); + const queue = queues.get(roomId) ?? []; + + if (queue.length > MAX_QUEUE_SIZE) { + log.error( + { stack: { roomId, size: queue.length }, notify: true }, + `Socket queue exceeded max size for ${roomId}`, + ); + } + queue.push(payload); + queues.set(roomId, queue); + + if (timers.has(roomId)) return; + + const timer = setTimeout(() => { + try { + const payloads = queues.get(roomId) ?? []; + + if (payloads.length === 0) return; + emitFn?.(roomId, payloads); + + queues.delete(roomId); + } catch (e) { + console.error("Socket emit failed", { roomId, e }); + } finally { + timers.delete(roomId); + } + }, FLUSH_MS); + + timers.set(roomId, timer); }; /* -import { emitToRoom } from "../socket/socketEmitter.js"; -// room name -// its payload -emitToRoom("logs", newLogRow); +example emitToRoom(room, payload) + +payload can be anything json serilized example below. + +emitToRoom("inventory:ppoo", { + type: "snapshot", + location: "ppoo", + items, + createdAt: new Date().toISOString(), +}); */ diff --git a/backend/socket.io/roomService.socket.ts b/backend/socket.io/roomService.socket.ts deleted file mode 100644 index 46b7f73..0000000 --- a/backend/socket.io/roomService.socket.ts +++ /dev/null @@ -1,90 +0,0 @@ -import type { Server } from "socket.io"; -import { createLogger } from "../logger/logger.controller.js"; -import { - FLUSH_INTERVAL, - MAX_HISTORY, - roomBuffers, - roomFlushTimers, - roomHistory, -} from "./roomCache.socket.js"; -import { type RoomId, roomDefinition } from "./roomDefinitions.socket.js"; - -// get the db data if not exiting already -const log = createLogger({ module: "socket.io", subModule: "roomService" }); -let ioRef: Server | null = null; - -export const registerRoomService = (io: Server) => { - ioRef = io; -}; - -export const hasRoomMembers = (roomId: string): boolean => { - if (!ioRef) return false; - - return (ioRef.sockets.adapter.rooms.get(roomId)?.size ?? 0) > 0; -}; - -export const getRoomMemberCount = (roomId: string): number => { - if (!ioRef) return 0; - - return ioRef.sockets.adapter.rooms.get(roomId)?.size ?? 0; -}; -export const preseedRoom = async (roomId: RoomId) => { - if (roomHistory.has(roomId)) { - if (!roomId.includes("dock")) { - return roomHistory.get(roomId); - } - } - - const roomDef = roomDefinition[roomId] as any; - - if (!roomDef) { - log.error({}, `Room ${roomId} is not defined`); - } - - const latestData = await roomDef.seed(MAX_HISTORY); - - roomHistory.set(roomId, latestData); - - return latestData; -}; - -export const createRoomEmitter = (io: Server) => { - const addDataToRoom = (roomId: RoomId, payload: T[]) => { - if (!roomHistory.has(roomId)) { - roomHistory.set(roomId, []); - } - - const history = roomHistory.get(roomId)!; - history?.push(payload); - - if (history?.length > MAX_HISTORY) { - history?.shift(); - } - - if (!roomBuffers.has(roomId)) { - roomBuffers.set(roomId, []); - } - - roomBuffers.get(roomId)!.push(payload); - - if (!roomFlushTimers.has(roomId)) { - const timer = setTimeout(() => { - const buffered = roomBuffers.get(roomId) || []; - - if (buffered.length > 0) { - io.to(roomId).emit("room-update", { - roomId, - payloads: buffered, // ✅ array now - }); - } - - roomBuffers.set(roomId, []); - roomFlushTimers.delete(roomId); - }, FLUSH_INTERVAL); - - roomFlushTimers.set(roomId, timer); - } - }; - - return { addDataToRoom }; -}; diff --git a/backend/socket.io/serverSetup.ts b/backend/socket.io/serverSetup.ts index 164f2b7..fef56e0 100644 --- a/backend/socket.io/serverSetup.ts +++ b/backend/socket.io/serverSetup.ts @@ -1,33 +1,16 @@ import type { Server as HttpServer } from "node:http"; -//import { dirname, join } from "node:path"; -//import { fileURLToPath } from "node:url"; import { instrument } from "@socket.io/admin-ui"; import { Server } from "socket.io"; import { createLogger } from "../logger/logger.controller.js"; +import { auth } from "../utils/auth.utils.js"; import { allowedOrigins } from "../utils/cors.utils.js"; import { registerEmitter } from "./roomEmitter.socket.js"; -import { - createRoomEmitter, - preseedRoom, - registerRoomService, -} from "./roomService.socket.js"; +import { registerHasRoomMembers } from "./socket.manager.js"; +import { isRoomKey, roomConfigs } from "./socket.roomConfig.js"; -//const __filename = fileURLToPath(import.meta.url); -//const __dirname = dirname(__filename); const log = createLogger({ module: "socket.io", subModule: "setup" }); -import { auth } from "../utils/auth.utils.js"; -//import type { Session, User } from "better-auth"; // adjust if needed -import { getRoomConfig } from "./roomDefinitions.socket.js"; - -// declare module "socket.io" { -// interface Socket { -// user?: User | any; -// session?: Session; -// } -// } - export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => { const io = new Server(server, { path: `${baseUrl}/api/socket.io`, @@ -37,12 +20,16 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => { }, }); - // manage members of the rooms. - registerRoomService(io); + registerHasRoomMembers((roomId) => { + return (io.sockets.adapter.rooms.get(roomId)?.size ?? 0) > 0; + }); - // ✅ Create emitter instance - const { addDataToRoom } = createRoomEmitter(io); - registerEmitter(addDataToRoom); + registerEmitter((roomId, payloads) => { + io.to(roomId).emit("room-update", { + roomId, + payloads, + }); + }); io.use(async (socket, next) => { try { @@ -85,79 +72,95 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => { version: "1.0.0", }); - // s.on("join-room", async (rn) => { - // const config = protectedRooms[rn]; + s.on("join-room", async ({ room, params }) => { + if (!isRoomKey(room)) return; - // if (config?.requiresAuth && !s.user) { - // return s.emit("room-error", { - // room: rn, - // message: "Authentication required", - // }); - // } - - // const roles = Array.isArray(config?.role) ? config?.role : [config?.role]; - - // //if (config?.role && s.user?.role !== config.role) { - // if (config?.role && !roles.includes(s.user?.role)) { - // return s.emit("room-error", { - // roomId: rn, - // message: `Not authorized to be in room: ${rn}`, - // }); - // } - // s.join(rn); - - // // get room seeded - // const history = await preseedRoom(rn); - // log.info({}, `User joined ${rn}: ${s.id}`); - // // send the intial data - // s.emit("room-update", { - // roomId: rn, - // payloads: history, - // initial: true, - // }); - // }); - - s.on("join-room", async (rn: string) => { - const config = getRoomConfig(rn); + const config = roomConfigs[room]; if (!config) { return s.emit("room-error", { - roomId: rn, - message: `Unknown room: ${rn}`, + roomId: room, + message: `Unknown room: ${room}`, }); } - if (config.requiresAuth && !s.user) { + const actualRoom = config.buildRoom + ? config.buildRoom(params) + : (room as any); + + const allowed = config.canJoin + ? await config.canJoin({ + socket: s, + user: s.user, + room, + actualRoom, + params, + }) + : true; + + if (!allowed) { return s.emit("room-error", { - roomId: rn, - message: "Authentication required", + roomId: room, + message: `Not authorized to be in room: ${room}`, }); } - const roles = Array.isArray(config.role) ? config.role : []; + await s.join(actualRoom); - if (roles.length > 0 && !roles.includes(s.user?.role)) { - return s.emit("room-error", { - roomId: rn, - message: `Not authorized to be in room: ${rn}`, - }); - } - - s.join(rn); - - const history = await preseedRoom(rn as any); - - log.info({}, `User joined ${rn}: ${s.id}`); - - s.emit("room-update", { - roomId: rn, - payloads: history, - initial: true, + s.emit("room-joined", { + room, + roomId: actualRoom, + params, }); + + if (config.seed) { + const payloads = await config.seed({ + room, + actualRoom, + params, + user: s.user, + }); + + s.emit("room-update", { + room, + roomId: actualRoom, + type: "snapshot", + payloads, + }); + } + + log.info( + { room, actualRoom, params }, + `User joined ${actualRoom}: ${s.id}`, + ); }); - s.on("leave-room", (room) => { - s.leave(room); - log.info({}, `${s.id} left room: ${room}`); + // s.on("leave-room", (room) => { + // s.leave(room); + // log.info({}, `${s.id} left room: ${JSON.stringify(room)}`); + // }); + s.on("leave-room", async ({ room, params }) => { + if (!isRoomKey(room)) return; + + const config = roomConfigs[room]; + + if (!config) return; + + const actualRoom = config.buildRoom + ? config.buildRoom(params) + : (room as any); + + await s.leave(actualRoom); + + s.emit("room-left", { + room, + roomId: actualRoom, + params, + }); + + log.info( + { room, actualRoom, params }, + `${s.id} left room: ${actualRoom}`, + ); }); }); diff --git a/backend/socket.io/socket.manager.ts b/backend/socket.io/socket.manager.ts new file mode 100644 index 0000000..d7889d5 --- /dev/null +++ b/backend/socket.io/socket.manager.ts @@ -0,0 +1,10 @@ +let hasMembersFn: ((roomId: string) => boolean) | null = null; + +export const registerHasRoomMembers = (fn: (roomId: string) => boolean) => { + hasMembersFn = fn; +}; + +export const hasRoomMembers = (roomId: string) => { + if (!hasMembersFn) return false; + return hasMembersFn(roomId); +}; diff --git a/backend/socket.io/socket.roomConfig.ts b/backend/socket.io/socket.roomConfig.ts new file mode 100644 index 0000000..8ba5094 --- /dev/null +++ b/backend/socket.io/socket.roomConfig.ts @@ -0,0 +1,117 @@ +import { getRecentLogs } from "../db/db.socketSeed.js"; +import { getRecentDockScans } from "../dockdoorScanning/dockdoor.socket.seed.js"; + +export type RoomKey = + | "logs" + | "labels" + | "admin" + | "inventory" + | "dockDoorLoading"; + +export type SocketUser = { + id: string; + email?: string; + role?: string; +}; + +export type CanJoinArgs = { + socket: any; + user?: SocketUser; + room: string; + actualRoom: string; + params?: Record; +}; + +type RoomConfig = { + //requiresAuth?: boolean; + //roles?: string[]; + canJoin?: (args: CanJoinArgs) => boolean | Promise; + buildRoom?: (params?: Record) => string | null; + seed?: (args: { + room: string; + actualRoom: string; + params?: Record; + user?: SocketUser; + }) => Promise; +}; + +export function isRoomKey(room: string): room is RoomKey { + return room in roomConfigs; +} + +export const roomConfigs: Record = { + logs: { + canJoin: ({ user, params }) => { + if (!params?.submodule && !params?.module) { + return user?.role === "systemAdmin"; + } + + return true; + }, + buildRoom: (params) => { + const module = String(params?.module ?? "").toLowerCase(); + const submodule = String(params?.submodule ?? "").toLowerCase(); + + if (module && submodule) return `logs:${module}:${submodule}`; + if (submodule) return `logs:${submodule}`; + if (module) return `logs:${module}`; + + return "logs"; + }, + seed: async ({ params }) => { + const module = params?.module ? String(params.module) : undefined; + const submodule = params?.submodule + ? String(params.submodule) + : undefined; + + return await getRecentLogs({ + module, + submodule, + limit: 200, + }); + }, + }, + + labels: { + canJoin: () => true, + buildRoom: () => "labels", + }, + + admin: { + canJoin: ({ user, params }) => { + if (params?.section === "system") { + return user?.role === "systemAdmin"; + } + + return true; + }, + buildRoom: (params) => + params?.section ? `admin:${params.section}` : "admin", + }, + inventory: { + canJoin: () => true, + buildRoom: (params) => + params?.location ? `inventory:${params.location}` : null, + }, + + dockDoorLoading: { + canJoin: () => true, + buildRoom: (params) => + params?.dockId ? `dockDoorLoading:${params.dockId}` : null, + seed: async ({ params }) => { + return await getRecentDockScans({ + loadingOrder: params?.loadingOrder as string, + limit: 200, + }); + }, + }, +} satisfies Record; + +/* + +socket.emit("join-room", { + room: "dockDoorLoading", + params: { dockId: "2" }, +}); + +*/