refactor(socket.io): complete rewrite to manage dynamic rooms and seeding better

This commit is contained in:
2026-06-10 16:25:48 -05:00
parent a2d9a6c127
commit 9440b44f3b
7 changed files with 271 additions and 315 deletions

View File

@@ -1,8 +0,0 @@
import type { RoomId } from "./roomDefinitions.socket.js";
export const MAX_HISTORY = 50;
export const FLUSH_INTERVAL = 100; // 50ms change higher if needed
export const roomHistory = new Map<RoomId, unknown[]>();
export const roomBuffers = new Map<RoomId, any[]>();
export const roomFlushTimers = new Map<RoomId, NodeJS.Timeout>();

View File

@@ -1,122 +0,0 @@
import { desc, eq } from "drizzle-orm";
import { db } from "../db/db.controller.js";
import { dockDoorScans } from "../db/schema/dockdoor.scans.schema.js";
import { logs } from "../db/schema/logs.schema.js";
import { ppoRun } from "../warehousing/warehousing.ppooMonitor.js";
type RoomDefinition<T = unknown> = {
seed: (limit: number) => Promise<T[]>;
};
export type StaticRoomId =
| "logs"
| "labels"
| "admin"
| "admin:build"
| "ppoo"
| "dockDoorLoading:2";
export type DynamicRoomId = `dockDoorLoading:${string}`;
export type RoomId = StaticRoomId | DynamicRoomId;
export type RoomConfig = {
requiresAuth?: boolean;
role?: string[];
seed?: (limit: number, roomId: RoomId) => Promise<unknown[]>;
};
export const protectedRooms: Record<StaticRoomId, RoomConfig> = {
logs: { requiresAuth: true, role: ["admin", "systemAdmin"] },
//admin: { requiresAuth: false, role: ["admin", "systemAdmin"] },
labels: {},
admin: {},
"admin:build": {},
ppoo: {},
"dockDoorLoading:2": {},
};
export function getRoomConfig(roomId: string): RoomConfig | null {
if (roomId in protectedRooms) {
return protectedRooms[roomId as StaticRoomId];
}
if (roomId.startsWith("dockDoorLoading:")) {
const dockId = roomId.split(":")[1];
if (!dockId) return null;
return {
requiresAuth: true,
role: ["admin", "systemAdmin", "dockDoor"],
};
}
return null;
}
export const roomDefinition: Record<RoomId, RoomDefinition> = {
logs: {
seed: async (limit) => {
try {
const rows = await db
.select()
.from(logs)
.orderBy(desc(logs.createdAt))
.limit(limit);
return rows; //.reverse();
} catch (e) {
console.error("Failed to seed logs:", e);
return [];
}
},
},
labels: {
seed: async (limit) => {
console.info(limit);
return [];
},
},
admin: {
seed: async (limit) => {
console.info(limit);
return [];
},
},
"admin:build": {
seed: async (limit) => {
console.info(limit);
return [];
},
},
ppoo: {
seed: async (limit) => {
console.log(limit);
return {
type: "snapshot",
items: await ppoRun(),
createdAt: new Date().toISOString(),
} as any;
},
},
// TODO: add in dynamic room seeding
"dockDoorLoading:2": {
seed: async (limit) => {
console.log(limit);
try {
const rows = await db
.select()
.from(dockDoorScans)
.where(eq(dockDoorScans.status, "active"))
.orderBy(desc(dockDoorScans.upd_date))
.limit(limit);
return rows; //.reverse();
} catch (e) {
console.error("Failed to seed logs:", e);
return [];
}
},
},
};

View File

@@ -1,27 +1,73 @@
// the emitter setup
// TODO: validate if we want to add event back in later..
// let emitFn: ((roomId: string, event: string, payload: unknown) => void) | null =
// null;
import type { RoomId } from "./roomDefinitions.socket.js";
import { createLogger } from "../logger/logger.controller.js";
let addDataToRoom: ((roomId: RoomId, payload: unknown[]) => void) | null = null;
type QueuedPayload = unknown;
let emitFn: ((roomId: string, payload: QueuedPayload[]) => void) | null = null;
const queues = new Map<string, QueuedPayload[]>();
const timers = new Map<string, NodeJS.Timeout>();
const FLUSH_MS = 500;
const MAX_QUEUE_SIZE = 200;
export const registerEmitter = (
fn: (roomId: RoomId, payload: unknown[]) => void,
fn: (roomId: string, payload: QueuedPayload) => void,
) => {
addDataToRoom = fn;
emitFn = fn;
};
export const emitToRoom = (roomId: RoomId, payload: unknown[]) => {
if (!addDataToRoom) {
export const emitToRoom = (roomId: string, payload: QueuedPayload) => {
const log = createLogger({ module: "socket.io", subModule: "emitter" });
if (!emitFn) {
console.error("Socket emitter not initialized");
return;
}
addDataToRoom(roomId, payload);
const queue = queues.get(roomId) ?? [];
if (queue.length > MAX_QUEUE_SIZE) {
log.error(
{ stack: { roomId, size: queue.length }, notify: true },
`Socket queue exceeded max size for ${roomId}`,
);
}
queue.push(payload);
queues.set(roomId, queue);
if (timers.has(roomId)) return;
const timer = setTimeout(() => {
try {
const payloads = queues.get(roomId) ?? [];
if (payloads.length === 0) return;
emitFn?.(roomId, payloads);
queues.delete(roomId);
} catch (e) {
console.error("Socket emit failed", { roomId, e });
} finally {
timers.delete(roomId);
}
}, FLUSH_MS);
timers.set(roomId, timer);
};
/*
import { emitToRoom } from "../socket/socketEmitter.js";
// room name
// its payload
emitToRoom("logs", newLogRow);
example emitToRoom(room, payload)
payload can be anything json serilized example below.
emitToRoom("inventory:ppoo", {
type: "snapshot",
location: "ppoo",
items,
createdAt: new Date().toISOString(),
});
*/

View File

@@ -1,90 +0,0 @@
import type { Server } from "socket.io";
import { createLogger } from "../logger/logger.controller.js";
import {
FLUSH_INTERVAL,
MAX_HISTORY,
roomBuffers,
roomFlushTimers,
roomHistory,
} from "./roomCache.socket.js";
import { type RoomId, roomDefinition } from "./roomDefinitions.socket.js";
// get the db data if not exiting already
const log = createLogger({ module: "socket.io", subModule: "roomService" });
let ioRef: Server | null = null;
export const registerRoomService = (io: Server) => {
ioRef = io;
};
export const hasRoomMembers = (roomId: string): boolean => {
if (!ioRef) return false;
return (ioRef.sockets.adapter.rooms.get(roomId)?.size ?? 0) > 0;
};
export const getRoomMemberCount = (roomId: string): number => {
if (!ioRef) return 0;
return ioRef.sockets.adapter.rooms.get(roomId)?.size ?? 0;
};
export const preseedRoom = async (roomId: RoomId) => {
if (roomHistory.has(roomId)) {
if (!roomId.includes("dock")) {
return roomHistory.get(roomId);
}
}
const roomDef = roomDefinition[roomId] as any;
if (!roomDef) {
log.error({}, `Room ${roomId} is not defined`);
}
const latestData = await roomDef.seed(MAX_HISTORY);
roomHistory.set(roomId, latestData);
return latestData;
};
export const createRoomEmitter = (io: Server) => {
const addDataToRoom = <T>(roomId: RoomId, payload: T[]) => {
if (!roomHistory.has(roomId)) {
roomHistory.set(roomId, []);
}
const history = roomHistory.get(roomId)!;
history?.push(payload);
if (history?.length > MAX_HISTORY) {
history?.shift();
}
if (!roomBuffers.has(roomId)) {
roomBuffers.set(roomId, []);
}
roomBuffers.get(roomId)!.push(payload);
if (!roomFlushTimers.has(roomId)) {
const timer = setTimeout(() => {
const buffered = roomBuffers.get(roomId) || [];
if (buffered.length > 0) {
io.to(roomId).emit("room-update", {
roomId,
payloads: buffered, // ✅ array now
});
}
roomBuffers.set(roomId, []);
roomFlushTimers.delete(roomId);
}, FLUSH_INTERVAL);
roomFlushTimers.set(roomId, timer);
}
};
return { addDataToRoom };
};

View File

@@ -1,33 +1,16 @@
import type { Server as HttpServer } from "node:http";
//import { dirname, join } from "node:path";
//import { fileURLToPath } from "node:url";
import { instrument } from "@socket.io/admin-ui";
import { Server } from "socket.io";
import { createLogger } from "../logger/logger.controller.js";
import { auth } from "../utils/auth.utils.js";
import { allowedOrigins } from "../utils/cors.utils.js";
import { registerEmitter } from "./roomEmitter.socket.js";
import {
createRoomEmitter,
preseedRoom,
registerRoomService,
} from "./roomService.socket.js";
import { registerHasRoomMembers } from "./socket.manager.js";
import { isRoomKey, roomConfigs } from "./socket.roomConfig.js";
//const __filename = fileURLToPath(import.meta.url);
//const __dirname = dirname(__filename);
const log = createLogger({ module: "socket.io", subModule: "setup" });
import { auth } from "../utils/auth.utils.js";
//import type { Session, User } from "better-auth"; // adjust if needed
import { getRoomConfig } from "./roomDefinitions.socket.js";
// declare module "socket.io" {
// interface Socket {
// user?: User | any;
// session?: Session;
// }
// }
export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
const io = new Server(server, {
path: `${baseUrl}/api/socket.io`,
@@ -37,12 +20,16 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
},
});
// manage members of the rooms.
registerRoomService(io);
registerHasRoomMembers((roomId) => {
return (io.sockets.adapter.rooms.get(roomId)?.size ?? 0) > 0;
});
// ✅ Create emitter instance
const { addDataToRoom } = createRoomEmitter(io);
registerEmitter(addDataToRoom);
registerEmitter((roomId, payloads) => {
io.to(roomId).emit("room-update", {
roomId,
payloads,
});
});
io.use(async (socket, next) => {
try {
@@ -85,79 +72,95 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
version: "1.0.0",
});
// s.on("join-room", async (rn) => {
// const config = protectedRooms[rn];
s.on("join-room", async ({ room, params }) => {
if (!isRoomKey(room)) return;
// if (config?.requiresAuth && !s.user) {
// return s.emit("room-error", {
// room: rn,
// message: "Authentication required",
// });
// }
// const roles = Array.isArray(config?.role) ? config?.role : [config?.role];
// //if (config?.role && s.user?.role !== config.role) {
// if (config?.role && !roles.includes(s.user?.role)) {
// return s.emit("room-error", {
// roomId: rn,
// message: `Not authorized to be in room: ${rn}`,
// });
// }
// s.join(rn);
// // get room seeded
// const history = await preseedRoom(rn);
// log.info({}, `User joined ${rn}: ${s.id}`);
// // send the intial data
// s.emit("room-update", {
// roomId: rn,
// payloads: history,
// initial: true,
// });
// });
s.on("join-room", async (rn: string) => {
const config = getRoomConfig(rn);
const config = roomConfigs[room];
if (!config) {
return s.emit("room-error", {
roomId: rn,
message: `Unknown room: ${rn}`,
roomId: room,
message: `Unknown room: ${room}`,
});
}
if (config.requiresAuth && !s.user) {
const actualRoom = config.buildRoom
? config.buildRoom(params)
: (room as any);
const allowed = config.canJoin
? await config.canJoin({
socket: s,
user: s.user,
room,
actualRoom,
params,
})
: true;
if (!allowed) {
return s.emit("room-error", {
roomId: rn,
message: "Authentication required",
roomId: room,
message: `Not authorized to be in room: ${room}`,
});
}
const roles = Array.isArray(config.role) ? config.role : [];
await s.join(actualRoom);
if (roles.length > 0 && !roles.includes(s.user?.role)) {
return s.emit("room-error", {
roomId: rn,
message: `Not authorized to be in room: ${rn}`,
});
}
s.join(rn);
const history = await preseedRoom(rn as any);
log.info({}, `User joined ${rn}: ${s.id}`);
s.emit("room-update", {
roomId: rn,
payloads: history,
initial: true,
s.emit("room-joined", {
room,
roomId: actualRoom,
params,
});
if (config.seed) {
const payloads = await config.seed({
room,
actualRoom,
params,
user: s.user,
});
s.emit("room-update", {
room,
roomId: actualRoom,
type: "snapshot",
payloads,
});
}
log.info(
{ room, actualRoom, params },
`User joined ${actualRoom}: ${s.id}`,
);
});
s.on("leave-room", (room) => {
s.leave(room);
log.info({}, `${s.id} left room: ${room}`);
// s.on("leave-room", (room) => {
// s.leave(room);
// log.info({}, `${s.id} left room: ${JSON.stringify(room)}`);
// });
s.on("leave-room", async ({ room, params }) => {
if (!isRoomKey(room)) return;
const config = roomConfigs[room];
if (!config) return;
const actualRoom = config.buildRoom
? config.buildRoom(params)
: (room as any);
await s.leave(actualRoom);
s.emit("room-left", {
room,
roomId: actualRoom,
params,
});
log.info(
{ room, actualRoom, params },
`${s.id} left room: ${actualRoom}`,
);
});
});

View File

@@ -0,0 +1,10 @@
let hasMembersFn: ((roomId: string) => boolean) | null = null;
export const registerHasRoomMembers = (fn: (roomId: string) => boolean) => {
hasMembersFn = fn;
};
export const hasRoomMembers = (roomId: string) => {
if (!hasMembersFn) return false;
return hasMembersFn(roomId);
};

View File

@@ -0,0 +1,117 @@
import { getRecentLogs } from "../db/db.socketSeed.js";
import { getRecentDockScans } from "../dockdoorScanning/dockdoor.socket.seed.js";
export type RoomKey =
| "logs"
| "labels"
| "admin"
| "inventory"
| "dockDoorLoading";
export type SocketUser = {
id: string;
email?: string;
role?: string;
};
export type CanJoinArgs = {
socket: any;
user?: SocketUser;
room: string;
actualRoom: string;
params?: Record<string, unknown>;
};
type RoomConfig = {
//requiresAuth?: boolean;
//roles?: string[];
canJoin?: (args: CanJoinArgs) => boolean | Promise<boolean>;
buildRoom?: (params?: Record<string, unknown>) => string | null;
seed?: (args: {
room: string;
actualRoom: string;
params?: Record<string, unknown>;
user?: SocketUser;
}) => Promise<unknown[]>;
};
export function isRoomKey(room: string): room is RoomKey {
return room in roomConfigs;
}
export const roomConfigs: Record<RoomKey, RoomConfig> = {
logs: {
canJoin: ({ user, params }) => {
if (!params?.submodule && !params?.module) {
return user?.role === "systemAdmin";
}
return true;
},
buildRoom: (params) => {
const module = String(params?.module ?? "").toLowerCase();
const submodule = String(params?.submodule ?? "").toLowerCase();
if (module && submodule) return `logs:${module}:${submodule}`;
if (submodule) return `logs:${submodule}`;
if (module) return `logs:${module}`;
return "logs";
},
seed: async ({ params }) => {
const module = params?.module ? String(params.module) : undefined;
const submodule = params?.submodule
? String(params.submodule)
: undefined;
return await getRecentLogs({
module,
submodule,
limit: 200,
});
},
},
labels: {
canJoin: () => true,
buildRoom: () => "labels",
},
admin: {
canJoin: ({ user, params }) => {
if (params?.section === "system") {
return user?.role === "systemAdmin";
}
return true;
},
buildRoom: (params) =>
params?.section ? `admin:${params.section}` : "admin",
},
inventory: {
canJoin: () => true,
buildRoom: (params) =>
params?.location ? `inventory:${params.location}` : null,
},
dockDoorLoading: {
canJoin: () => true,
buildRoom: (params) =>
params?.dockId ? `dockDoorLoading:${params.dockId}` : null,
seed: async ({ params }) => {
return await getRecentDockScans({
loadingOrder: params?.loadingOrder as string,
limit: 200,
});
},
},
} satisfies Record<string, RoomConfig>;
/*
socket.emit("join-room", {
room: "dockDoorLoading",
params: { dockId: "2" },
});
*/