7 Commits

27 changed files with 811 additions and 428 deletions

View File

@@ -1,5 +1,7 @@
import { drizzle } from "drizzle-orm/postgres-js"; import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres"; import postgres from "postgres";
import * as dockScans from "./schema/dockdoor.scans.schema.js";
import * as logs from "./schema/logs.schema.js";
import * as opendockAVCheck from "./schema/opendock_articleSetup.js"; import * as opendockAVCheck from "./schema/opendock_articleSetup.js";
import * as scanUserSchema from "./schema/scanUsers.js"; import * as scanUserSchema from "./schema/scanUsers.js";
import * as settingsSchema from "./schema/settings.schema.js"; import * as settingsSchema from "./schema/settings.schema.js";
@@ -23,5 +25,7 @@ export const db = drizzle(queryClient, {
...scanUserSchema, ...scanUserSchema,
...settingsSchema, ...settingsSchema,
...opendockAVCheck, ...opendockAVCheck,
...logs,
...dockScans,
}, },
}); });

59
backend/db/db.listener.ts Normal file
View File

@@ -0,0 +1,59 @@
import postgres from "postgres";
import { createLogger } from "../logger/logger.controller.js";
import { handleDbNotification } from "./db.router.js";
const log = createLogger({
module: "db",
subModule: "notifications",
});
const CHANNELS = [
"logs_inserted",
// "labels_inserted",
// "dock_scans_inserted",
] as const;
type DbNotificationChannel = (typeof CHANNELS)[number];
type DbNotificationPayload = {
table: string;
action: "INSERT" | "UPDATE" | "DELETE";
id: string;
};
export async function startDbNotificationListener() {
const sql = postgres({
host: `${process.env.DATABASE_HOST}`,
port: Number(process.env.DATABASE_PORT),
database: `${process.env.DATABASE_DB}`,
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
});
for (const channel of CHANNELS) {
await sql.listen(channel, async (rawPayload) => {
await processNotification(channel, rawPayload);
});
log.info({ stack: { channel } }, `Listening for ${channel}`);
}
}
async function processNotification(
channel: DbNotificationChannel,
rawPayload: string,
) {
try {
const payload = JSON.parse(rawPayload) as DbNotificationPayload;
await handleDbNotification({
channel,
payload,
});
} catch (e) {
log.error(
{ stack: { channel, rawPayload, e }, notify: true },
"Failed processing DB notification",
);
}
}

41
backend/db/db.router.ts Normal file
View File

@@ -0,0 +1,41 @@
import { handleDockScanInsertedNotification } from "../dockdoorScanning/dockdoor.socket.notifications.js";
import { createLogger } from "../logger/logger.controller.js";
import { handleLogInsertedNotification } from "../logger/logger.socket.notifications.js";
const log = createLogger({
module: "db",
subModule: "notifications-router",
});
type DbNotification = {
channel: string;
payload: {
table: string;
action: "INSERT" | "UPDATE" | "DELETE";
id: string;
};
};
export async function handleDbNotification(notification: DbNotification) {
const { channel, payload } = notification;
switch (channel) {
case "logs_inserted":
await handleLogInsertedNotification(payload.id);
return;
// case "labels_inserted":
// await handleLabelInsertedNotification(payload.id);
// return;
case "dock_scan_inserted":
await handleDockScanInsertedNotification(payload.id);
return;
default:
log.warn(
{ stack: notification },
`Unhandled DB notification channel: ${channel}`,
);
}
}

View File

@@ -0,0 +1,99 @@
import { sql } from "drizzle-orm";
import { db } from "../db/db.controller.js";
import { createLogger } from "../logger/logger.controller.js";
const log = createLogger({
module: "db",
subModule: "notifications",
});
/**
* Creates/updates Postgres notification functions + triggers.
*
* Safe to run on every app startup.
* CREATE OR REPLACE updates the function.
* DROP TRIGGER IF EXISTS prevents duplicate triggers.
*/
export async function setupDbNotifications() {
log.info({}, "Setting up DB notifications");
await setupLogsNotifications();
await setupDockScansNotifications();
log.info({}, "DB notifications setup complete");
}
/**
* Logs notification setup.
*
* Flow:
* 1. app inserts into logs table
* 2. trigger runs after insert
* 3. Postgres sends NOTIFY logs_inserted with the new log id
* 4. Node listener receives id and fetches/emits full row
*/
async function setupLogsNotifications() {
await db.execute(sql`
CREATE OR REPLACE FUNCTION notify_logs_inserted()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'logs_inserted',
json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'id', NEW.id
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`);
await db.execute(sql`
DROP TRIGGER IF EXISTS logs_inserted_notify_trigger ON logs;
`);
await db.execute(sql`
CREATE TRIGGER logs_inserted_notify_trigger
AFTER INSERT ON logs
FOR EACH ROW
EXECUTE FUNCTION notify_logs_inserted();
`);
log.info({}, "Logs DB notification trigger ready");
}
async function setupDockScansNotifications() {
await db.execute(sql`
CREATE OR REPLACE FUNCTION notify_dock_scan_inserted()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'dock_scan_inserted',
json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'id', NEW.id
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`);
await db.execute(sql`
DROP TRIGGER IF EXISTS dock_scan_inserted_notify_trigger ON dock_door_scans;
`);
await db.execute(sql`
CREATE TRIGGER dock_scan_inserted_notify_trigger
AFTER INSERT ON dock_door_scans
FOR EACH ROW
EXECUTE FUNCTION notify_dock_scan_inserted();
`);
log.info({}, "Dock scan DB notification trigger ready");
}

View File

@@ -0,0 +1,21 @@
import { db } from "./db.controller.js";
export const getRecentLogs = ({
module,
submodule,
limit = 200,
}: {
module?: string | undefined;
submodule?: string | undefined;
limit?: number | undefined;
}) => {
return db.query.logs.findMany({
where: (logs, { and, eq }) =>
and(
module ? eq(logs.module, module) : undefined,
submodule ? eq(logs.subModule, submodule) : undefined,
),
orderBy: (logs, { desc }) => [desc(logs.createdAt)],
limit,
});
};

View File

@@ -16,7 +16,58 @@ const endLoading = z.object({
}); });
r.post("/", async (req, res) => { r.post("/", async (req, res) => {
// TODO: setup the emitter to just emit the data when we post to the db if (req.body.clear) {
// just clear the loading order and clear out all the pallets to keep it clean.
await tryCatch(
db
.update(dockDoorScans)
.set({
status: "completed",
upd_date: sql`NOW()`,
upd_user: req.user?.username ?? "lst-dock-system",
})
.where(
req.body.loadingOrder
? eq(dockDoorScanners.currentLoadingOrder, req.body.loadingOrder)
: undefined,
)
.returning(),
);
const { data, error } = await tryCatch(
db
.update(dockDoorScanners)
.set({
currentLoadingOrder: "",
upd_date: sql`NOW()`,
upd_user: req.user?.username ?? "lst-dock-system",
})
.where(eq(dockDoorScanners.dockId, req.body.dockId))
.returning(),
);
if (error) {
return apiReturn(res, {
success: false,
level: "error",
module: "dockdoor",
subModule: "loadingOrder",
message: `Failed to updating the dock.`,
data: (error as any) ?? [],
status: 400,
});
}
return apiReturn(res, {
success: true,
level: "info",
module: "dockdoor",
subModule: "loadingOrder",
message: `Loading order: ${req.body.loadingOrder} was just cleared out do to the process being completed in some other means. \nThis includes any scanned pallets as well.`,
data: data ?? [],
status: 200,
});
}
try { try {
const validated = endLoading.parse(req.body); const validated = endLoading.parse(req.body);
@@ -73,7 +124,9 @@ r.post("/", async (req, res) => {
upd_date: sql`NOW()`, upd_date: sql`NOW()`,
upd_user: req.user?.username ?? "lst-dock-system", upd_user: req.user?.username ?? "lst-dock-system",
}) })
.where(eq(dockDoorScanners.currentLoadingOrder, validated.loadingOrder)) .where(
eq(dockDoorScans.loadingOrder, validated.loadingOrder.toString()),
)
.returning(), .returning(),
); );

View File

@@ -0,0 +1,17 @@
import { eq } from "drizzle-orm";
import { db } from "../db/db.controller.js";
import { logs } from "../db/schema/logs.schema.js";
import { emitToRoom } from "../socket.io/roomEmitter.socket.js";
export async function handleDockScanInsertedNotification(id: string) {
const row = await db.query.dockDoorScans.findFirst({
where: eq(logs.id, id),
});
if (!row) return;
// send only to the current dock door
if (row.dockId) {
emitToRoom(`dockDoorLoading:${row.dockId}`, row);
}
}

View File

@@ -0,0 +1,20 @@
import { db } from "../db/db.controller.js";
export const getRecentDockScans = ({
loadingOrder,
limit = 200,
}: {
loadingOrder: string;
limit?: number | undefined;
}) => {
return db.query.dockDoorScans.findMany({
//where: (scans, { eq }) => eq(scans.status, "active"),
where: (scans, { and, eq }) =>
and(
eq(scans.status, "active"),
loadingOrder ? eq(scans.loadingOrder, loadingOrder) : undefined,
),
orderBy: (scans, { desc }) => [desc(scans.upd_date)],
limit,
});
};

View File

@@ -3,7 +3,6 @@ import { Writable } from "node:stream";
import pino, { type Logger } from "pino"; import pino, { type Logger } from "pino";
import { db } from "../db/db.controller.js"; import { db } from "../db/db.controller.js";
import { logs } from "../db/schema/logs.schema.js"; import { logs } from "../db/schema/logs.schema.js";
import { emitToRoom } from "../socket.io/roomEmitter.socket.js";
import { tryCatch } from "../utils/trycatch.utils.js"; import { tryCatch } from "../utils/trycatch.utils.js";
import { notifySystemIssue } from "./logger.notify.js"; import { notifySystemIssue } from "./logger.notify.js";
//import build from "pino-abstract-transport"; //import build from "pino-abstract-transport";
@@ -50,10 +49,10 @@ const dbStream = new Writable({
notifySystemIssue(obj); notifySystemIssue(obj);
} }
if (obj.room) { // if (obj.room) {
emitToRoom(obj.room, res.data ? res.data[0] : obj); // emitToRoom(obj.room, res.data ? res.data[0] : obj);
} // }
emitToRoom("logs", res.data ? res.data[0] : obj); // emitToRoom("logs", res.data ? res.data[0] : obj);
callback(); callback();
} catch (err) { } catch (err) {
console.error("DB log insert error:", err); console.error("DB log insert error:", err);

View File

@@ -0,0 +1,24 @@
import { eq } from "drizzle-orm";
import { db } from "../db/db.controller.js";
import { logs } from "../db/schema/logs.schema.js";
import { emitToRoom } from "../socket.io/roomEmitter.socket.js";
export async function handleLogInsertedNotification(id: string) {
const row = await db.query.logs.findFirst({
where: eq(logs.id, id),
});
if (!row) return;
// More targeted rooms.
if (row.module) {
emitToRoom(`logs:${row.module}`, row);
}
if (row.subModule) {
emitToRoom(`logs:${row.subModule}`, row);
}
// Everyone listening to all logs.
emitToRoom("logs", row);
}

View File

@@ -12,7 +12,7 @@ export let odToken: ODToken = {
}; };
export const getToken = async () => { export const getToken = async () => {
const log = createLogger({ module: "opendock", subModule: "releaseMonitor" }); const log = createLogger({ module: "opendock", subModule: "auth" });
try { try {
const { status, data } = await axios.post( const { status, data } = await axios.post(
`${process.env.OPENDOCK_URL}/auth/login`, `${process.env.OPENDOCK_URL}/auth/login`,
@@ -29,7 +29,8 @@ export const getToken = async () => {
odToken = { odToken: data.access_token, tokenDate: new Date() }; odToken = { odToken: data.access_token, tokenDate: new Date() };
log.info({ odToken }, "Token added"); log.info({ odToken }, "Token added");
return;
} catch (e) { } catch (e) {
log.error({ error: e }, "Error getting/refreshing token"); log.error({ stack: e }, "Error getting/refreshing token");
} }
}; };

View File

@@ -2,6 +2,8 @@ import { createServer } from "node:http";
import os from "node:os"; import os from "node:os";
import createApp from "./app.js"; import createApp from "./app.js";
import { db } from "./db/db.controller.js"; import { db } from "./db/db.controller.js";
import { startDbNotificationListener } from "./db/db.listener.js";
import { setupDbNotifications } from "./db/db.setupNotifications.js";
import { dbCleanup } from "./db/dbCleanup.controller.js"; import { dbCleanup } from "./db/dbCleanup.controller.js";
import { type Setting, settings } from "./db/schema/settings.schema.js"; import { type Setting, settings } from "./db/schema/settings.schema.js";
import { connectGPSql } from "./gpSql/gpSqlConnection.controller.js"; import { connectGPSql } from "./gpSql/gpSqlConnection.controller.js";
@@ -43,6 +45,7 @@ const start = async () => {
startTCPServer(); startTCPServer();
connectProdSql(); connectProdSql();
connectGPSql(); connectGPSql();
startDbNotificationListener();
// trigger startup processes these must run before anything else can run // trigger startup processes these must run before anything else can run
await baseSettingValidationCheck(); await baseSettingValidationCheck();
@@ -89,6 +92,7 @@ const start = async () => {
startNotifications(); startNotifications();
serversChecks(); serversChecks();
aggregateRouteHitsForBusinessDay(); aggregateRouteHitsForBusinessDay();
setupDbNotifications();
// can be removed at a later date // can be removed at a later date
sqlJobCleanUp(); sqlJobCleanUp();

View File

@@ -1,8 +0,0 @@
import type { RoomId } from "./roomDefinitions.socket.js";
export const MAX_HISTORY = 50;
export const FLUSH_INTERVAL = 100; // 50ms change higher if needed
export const roomHistory = new Map<RoomId, unknown[]>();
export const roomBuffers = new Map<RoomId, any[]>();
export const roomFlushTimers = new Map<RoomId, NodeJS.Timeout>();

View File

@@ -1,122 +0,0 @@
import { desc, eq } from "drizzle-orm";
import { db } from "../db/db.controller.js";
import { dockDoorScans } from "../db/schema/dockdoor.scans.schema.js";
import { logs } from "../db/schema/logs.schema.js";
import { ppoRun } from "../warehousing/warehousing.ppooMonitor.js";
type RoomDefinition<T = unknown> = {
seed: (limit: number) => Promise<T[]>;
};
export type StaticRoomId =
| "logs"
| "labels"
| "admin"
| "admin:build"
| "ppoo"
| "dockDoorLoading:2";
export type DynamicRoomId = `dockDoorLoading:${string}`;
export type RoomId = StaticRoomId | DynamicRoomId;
export type RoomConfig = {
requiresAuth?: boolean;
role?: string[];
seed?: (limit: number, roomId: RoomId) => Promise<unknown[]>;
};
export const protectedRooms: Record<StaticRoomId, RoomConfig> = {
logs: { requiresAuth: true, role: ["admin", "systemAdmin"] },
//admin: { requiresAuth: false, role: ["admin", "systemAdmin"] },
labels: {},
admin: {},
"admin:build": {},
ppoo: {},
"dockDoorLoading:2": {},
};
export function getRoomConfig(roomId: string): RoomConfig | null {
if (roomId in protectedRooms) {
return protectedRooms[roomId as StaticRoomId];
}
if (roomId.startsWith("dockDoorLoading:")) {
const dockId = roomId.split(":")[1];
if (!dockId) return null;
return {
requiresAuth: true,
role: ["admin", "systemAdmin", "dockDoor"],
};
}
return null;
}
export const roomDefinition: Record<RoomId, RoomDefinition> = {
logs: {
seed: async (limit) => {
try {
const rows = await db
.select()
.from(logs)
.orderBy(desc(logs.createdAt))
.limit(limit);
return rows; //.reverse();
} catch (e) {
console.error("Failed to seed logs:", e);
return [];
}
},
},
labels: {
seed: async (limit) => {
console.info(limit);
return [];
},
},
admin: {
seed: async (limit) => {
console.info(limit);
return [];
},
},
"admin:build": {
seed: async (limit) => {
console.info(limit);
return [];
},
},
ppoo: {
seed: async (limit) => {
console.log(limit);
return {
type: "snapshot",
items: await ppoRun(),
createdAt: new Date().toISOString(),
} as any;
},
},
// TODO: add in dynamic room seeding
"dockDoorLoading:2": {
seed: async (limit) => {
console.log(limit);
try {
const rows = await db
.select()
.from(dockDoorScans)
.where(eq(dockDoorScans.status, "active"))
.orderBy(desc(dockDoorScans.upd_date))
.limit(limit);
return rows; //.reverse();
} catch (e) {
console.error("Failed to seed logs:", e);
return [];
}
},
},
};

View File

@@ -1,27 +1,73 @@
// the emitter setup // the emitter setup
// TODO: validate if we want to add event back in later..
// let emitFn: ((roomId: string, event: string, payload: unknown) => void) | null =
// null;
import type { RoomId } from "./roomDefinitions.socket.js"; import { createLogger } from "../logger/logger.controller.js";
let addDataToRoom: ((roomId: RoomId, payload: unknown[]) => void) | null = null; type QueuedPayload = unknown;
let emitFn: ((roomId: string, payload: QueuedPayload[]) => void) | null = null;
const queues = new Map<string, QueuedPayload[]>();
const timers = new Map<string, NodeJS.Timeout>();
const FLUSH_MS = 500;
const MAX_QUEUE_SIZE = 200;
export const registerEmitter = ( export const registerEmitter = (
fn: (roomId: RoomId, payload: unknown[]) => void, fn: (roomId: string, payload: QueuedPayload) => void,
) => { ) => {
addDataToRoom = fn; emitFn = fn;
}; };
export const emitToRoom = (roomId: RoomId, payload: unknown[]) => { export const emitToRoom = (roomId: string, payload: QueuedPayload) => {
if (!addDataToRoom) { const log = createLogger({ module: "socket.io", subModule: "emitter" });
if (!emitFn) {
console.error("Socket emitter not initialized"); console.error("Socket emitter not initialized");
return; return;
} }
addDataToRoom(roomId, payload); const queue = queues.get(roomId) ?? [];
if (queue.length > MAX_QUEUE_SIZE) {
log.error(
{ stack: { roomId, size: queue.length }, notify: true },
`Socket queue exceeded max size for ${roomId}`,
);
}
queue.push(payload);
queues.set(roomId, queue);
if (timers.has(roomId)) return;
const timer = setTimeout(() => {
try {
const payloads = queues.get(roomId) ?? [];
if (payloads.length === 0) return;
emitFn?.(roomId, payloads);
queues.delete(roomId);
} catch (e) {
console.error("Socket emit failed", { roomId, e });
} finally {
timers.delete(roomId);
}
}, FLUSH_MS);
timers.set(roomId, timer);
}; };
/* /*
import { emitToRoom } from "../socket/socketEmitter.js"; example emitToRoom(room, payload)
// room name
// its payload payload can be anything json serilized example below.
emitToRoom("logs", newLogRow);
emitToRoom("inventory:ppoo", {
type: "snapshot",
location: "ppoo",
items,
createdAt: new Date().toISOString(),
});
*/ */

View File

@@ -1,90 +0,0 @@
import type { Server } from "socket.io";
import { createLogger } from "../logger/logger.controller.js";
import {
FLUSH_INTERVAL,
MAX_HISTORY,
roomBuffers,
roomFlushTimers,
roomHistory,
} from "./roomCache.socket.js";
import { type RoomId, roomDefinition } from "./roomDefinitions.socket.js";
// get the db data if not exiting already
const log = createLogger({ module: "socket.io", subModule: "roomService" });
let ioRef: Server | null = null;
export const registerRoomService = (io: Server) => {
ioRef = io;
};
export const hasRoomMembers = (roomId: string): boolean => {
if (!ioRef) return false;
return (ioRef.sockets.adapter.rooms.get(roomId)?.size ?? 0) > 0;
};
export const getRoomMemberCount = (roomId: string): number => {
if (!ioRef) return 0;
return ioRef.sockets.adapter.rooms.get(roomId)?.size ?? 0;
};
export const preseedRoom = async (roomId: RoomId) => {
if (roomHistory.has(roomId)) {
if (!roomId.includes("dock")) {
return roomHistory.get(roomId);
}
}
const roomDef = roomDefinition[roomId] as any;
if (!roomDef) {
log.error({}, `Room ${roomId} is not defined`);
}
const latestData = await roomDef.seed(MAX_HISTORY);
roomHistory.set(roomId, latestData);
return latestData;
};
export const createRoomEmitter = (io: Server) => {
const addDataToRoom = <T>(roomId: RoomId, payload: T[]) => {
if (!roomHistory.has(roomId)) {
roomHistory.set(roomId, []);
}
const history = roomHistory.get(roomId)!;
history?.push(payload);
if (history?.length > MAX_HISTORY) {
history?.shift();
}
if (!roomBuffers.has(roomId)) {
roomBuffers.set(roomId, []);
}
roomBuffers.get(roomId)!.push(payload);
if (!roomFlushTimers.has(roomId)) {
const timer = setTimeout(() => {
const buffered = roomBuffers.get(roomId) || [];
if (buffered.length > 0) {
io.to(roomId).emit("room-update", {
roomId,
payloads: buffered, // ✅ array now
});
}
roomBuffers.set(roomId, []);
roomFlushTimers.delete(roomId);
}, FLUSH_INTERVAL);
roomFlushTimers.set(roomId, timer);
}
};
return { addDataToRoom };
};

View File

@@ -1,33 +1,16 @@
import type { Server as HttpServer } from "node:http"; import type { Server as HttpServer } from "node:http";
//import { dirname, join } from "node:path";
//import { fileURLToPath } from "node:url";
import { instrument } from "@socket.io/admin-ui"; import { instrument } from "@socket.io/admin-ui";
import { Server } from "socket.io"; import { Server } from "socket.io";
import { createLogger } from "../logger/logger.controller.js"; import { createLogger } from "../logger/logger.controller.js";
import { auth } from "../utils/auth.utils.js";
import { allowedOrigins } from "../utils/cors.utils.js"; import { allowedOrigins } from "../utils/cors.utils.js";
import { registerEmitter } from "./roomEmitter.socket.js"; import { registerEmitter } from "./roomEmitter.socket.js";
import { import { registerHasRoomMembers } from "./socket.manager.js";
createRoomEmitter, import { isRoomKey, roomConfigs } from "./socket.roomConfig.js";
preseedRoom,
registerRoomService,
} from "./roomService.socket.js";
//const __filename = fileURLToPath(import.meta.url);
//const __dirname = dirname(__filename);
const log = createLogger({ module: "socket.io", subModule: "setup" }); const log = createLogger({ module: "socket.io", subModule: "setup" });
import { auth } from "../utils/auth.utils.js";
//import type { Session, User } from "better-auth"; // adjust if needed
import { getRoomConfig } from "./roomDefinitions.socket.js";
// declare module "socket.io" {
// interface Socket {
// user?: User | any;
// session?: Session;
// }
// }
export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => { export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
const io = new Server(server, { const io = new Server(server, {
path: `${baseUrl}/api/socket.io`, path: `${baseUrl}/api/socket.io`,
@@ -37,12 +20,16 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
}, },
}); });
// manage members of the rooms. registerHasRoomMembers((roomId) => {
registerRoomService(io); return (io.sockets.adapter.rooms.get(roomId)?.size ?? 0) > 0;
});
// ✅ Create emitter instance registerEmitter((roomId, payloads) => {
const { addDataToRoom } = createRoomEmitter(io); io.to(roomId).emit("room-update", {
registerEmitter(addDataToRoom); roomId,
payloads,
});
});
io.use(async (socket, next) => { io.use(async (socket, next) => {
try { try {
@@ -85,79 +72,95 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
version: "1.0.0", version: "1.0.0",
}); });
// s.on("join-room", async (rn) => { s.on("join-room", async ({ room, params }) => {
// const config = protectedRooms[rn]; if (!isRoomKey(room)) return;
// if (config?.requiresAuth && !s.user) { const config = roomConfigs[room];
// return s.emit("room-error", {
// room: rn,
// message: "Authentication required",
// });
// }
// const roles = Array.isArray(config?.role) ? config?.role : [config?.role];
// //if (config?.role && s.user?.role !== config.role) {
// if (config?.role && !roles.includes(s.user?.role)) {
// return s.emit("room-error", {
// roomId: rn,
// message: `Not authorized to be in room: ${rn}`,
// });
// }
// s.join(rn);
// // get room seeded
// const history = await preseedRoom(rn);
// log.info({}, `User joined ${rn}: ${s.id}`);
// // send the intial data
// s.emit("room-update", {
// roomId: rn,
// payloads: history,
// initial: true,
// });
// });
s.on("join-room", async (rn: string) => {
const config = getRoomConfig(rn);
if (!config) { if (!config) {
return s.emit("room-error", { return s.emit("room-error", {
roomId: rn, roomId: room,
message: `Unknown room: ${rn}`, message: `Unknown room: ${room}`,
}); });
} }
if (config.requiresAuth && !s.user) { const actualRoom = config.buildRoom
? config.buildRoom(params)
: (room as any);
const allowed = config.canJoin
? await config.canJoin({
socket: s,
user: s.user,
room,
actualRoom,
params,
})
: true;
if (!allowed) {
return s.emit("room-error", { return s.emit("room-error", {
roomId: rn, roomId: room,
message: "Authentication required", message: `Not authorized to be in room: ${room}`,
}); });
} }
const roles = Array.isArray(config.role) ? config.role : []; await s.join(actualRoom);
if (roles.length > 0 && !roles.includes(s.user?.role)) { s.emit("room-joined", {
return s.emit("room-error", { room,
roomId: rn, roomId: actualRoom,
message: `Not authorized to be in room: ${rn}`, params,
}); });
}
s.join(rn); if (config.seed) {
const payloads = await config.seed({
const history = await preseedRoom(rn as any); room,
actualRoom,
log.info({}, `User joined ${rn}: ${s.id}`); params,
user: s.user,
});
s.emit("room-update", { s.emit("room-update", {
roomId: rn, room,
payloads: history, roomId: actualRoom,
initial: true, type: "snapshot",
payloads,
}); });
}
log.info(
{ room, actualRoom, params },
`User joined ${actualRoom}: ${s.id}`,
);
}); });
s.on("leave-room", (room) => { // s.on("leave-room", (room) => {
s.leave(room); // s.leave(room);
log.info({}, `${s.id} left room: ${room}`); // log.info({}, `${s.id} left room: ${JSON.stringify(room)}`);
// });
s.on("leave-room", async ({ room, params }) => {
if (!isRoomKey(room)) return;
const config = roomConfigs[room];
if (!config) return;
const actualRoom = config.buildRoom
? config.buildRoom(params)
: (room as any);
await s.leave(actualRoom);
s.emit("room-left", {
room,
roomId: actualRoom,
params,
});
log.info(
{ room, actualRoom, params },
`${s.id} left room: ${actualRoom}`,
);
}); });
}); });

View File

@@ -0,0 +1,10 @@
let hasMembersFn: ((roomId: string) => boolean) | null = null;
export const registerHasRoomMembers = (fn: (roomId: string) => boolean) => {
hasMembersFn = fn;
};
export const hasRoomMembers = (roomId: string) => {
if (!hasMembersFn) return false;
return hasMembersFn(roomId);
};

View File

@@ -0,0 +1,117 @@
import { getRecentLogs } from "../db/db.socketSeed.js";
import { getRecentDockScans } from "../dockdoorScanning/dockdoor.socket.seed.js";
export type RoomKey =
| "logs"
| "labels"
| "admin"
| "inventory"
| "dockDoorLoading";
export type SocketUser = {
id: string;
email?: string;
role?: string;
};
export type CanJoinArgs = {
socket: any;
user?: SocketUser;
room: string;
actualRoom: string;
params?: Record<string, unknown>;
};
type RoomConfig = {
//requiresAuth?: boolean;
//roles?: string[];
canJoin?: (args: CanJoinArgs) => boolean | Promise<boolean>;
buildRoom?: (params?: Record<string, unknown>) => string | null;
seed?: (args: {
room: string;
actualRoom: string;
params?: Record<string, unknown>;
user?: SocketUser;
}) => Promise<unknown[]>;
};
export function isRoomKey(room: string): room is RoomKey {
return room in roomConfigs;
}
export const roomConfigs: Record<RoomKey, RoomConfig> = {
logs: {
canJoin: ({ user, params }) => {
if (!params?.submodule && !params?.module) {
return user?.role === "systemAdmin";
}
return true;
},
buildRoom: (params) => {
const module = String(params?.module ?? "").toLowerCase();
const submodule = String(params?.submodule ?? "").toLowerCase();
if (module && submodule) return `logs:${module}:${submodule}`;
if (submodule) return `logs:${submodule}`;
if (module) return `logs:${module}`;
return "logs";
},
seed: async ({ params }) => {
const module = params?.module ? String(params.module) : undefined;
const submodule = params?.submodule
? String(params.submodule)
: undefined;
return await getRecentLogs({
module,
submodule,
limit: 200,
});
},
},
labels: {
canJoin: () => true,
buildRoom: () => "labels",
},
admin: {
canJoin: ({ user, params }) => {
if (params?.section === "system") {
return user?.role === "systemAdmin";
}
return true;
},
buildRoom: (params) =>
params?.section ? `admin:${params.section}` : "admin",
},
inventory: {
canJoin: () => true,
buildRoom: (params) =>
params?.location ? `inventory:${params.location}` : null,
},
dockDoorLoading: {
canJoin: () => true,
buildRoom: (params) =>
params?.dockId ? `dockDoorLoading:${params.dockId}` : null,
seed: async ({ params }) => {
return await getRecentDockScans({
loadingOrder: params?.loadingOrder as string,
limit: 200,
});
},
},
} satisfies Record<string, RoomConfig>;
/*
socket.emit("join-room", {
room: "dockDoorLoading",
params: { dockId: "2" },
});
*/

View File

@@ -1,5 +1,5 @@
import { emitToRoom } from "../socket.io/roomEmitter.socket.js"; import { emitToRoom } from "../socket.io/roomEmitter.socket.js";
import { hasRoomMembers } from "../socket.io/roomService.socket.js"; import { hasRoomMembers } from "../socket.io/socket.manager.js";
import { runProdApi } from "../utils/prodEndpoint.utils.js"; import { runProdApi } from "../utils/prodEndpoint.utils.js";
export const ppoRun = async () => { export const ppoRun = async () => {
@@ -17,11 +17,13 @@ export const ppoRun = async () => {
}; };
export const ppooMonitoring = async () => { export const ppooMonitoring = async () => {
if (!hasRoomMembers(`ppoo`)) { const roomId = "inventory:ppoo";
if (!hasRoomMembers(roomId)) {
return; return;
} }
emitToRoom("ppoo", { emitToRoom(roomId, {
type: "snapshot", type: "snapshot",
items: await ppoRun(), items: await ppoRun(),
createdAt: new Date().toISOString(), createdAt: new Date().toISOString(),

View File

@@ -1,7 +1,7 @@
import { useQuery } from "@tanstack/react-query"; //import { useQuery } from "@tanstack/react-query";
import { Link } from "@tanstack/react-router"; import { Link } from "@tanstack/react-router";
import { ChevronRight, Link as link } from "lucide-react"; import { ChevronRight, Link as link } from "lucide-react";
import { permissionQuery } from "../../lib/queries/permsCheck"; //import { permissionQuery } from "../../lib/queries/permsCheck";
import { import {
Collapsible, Collapsible,
CollapsibleContent, CollapsibleContent,
@@ -21,11 +21,11 @@ import {
} from "../ui/sidebar"; } from "../ui/sidebar";
export default function WarehouseBar() { export default function WarehouseBar() {
const { data: canCreate = false } = useQuery( // const { data: canCreate = false } = useQuery(
permissionQuery({ // permissionQuery({
warehouse: ["read"], // warehouse: ["read"],
}), // }),
); // );
const { setOpen } = useSidebar(); const { setOpen } = useSidebar();
const items = [ const items = [
@@ -33,7 +33,7 @@ export default function WarehouseBar() {
title: "Dock Door Scanning", title: "Dock Door Scanning",
url: "/warehouse", url: "/warehouse",
//icon, //icon,
isActive: canCreate, isActive: true,
items: [ items: [
{ {
title: "DockDoorScanning", title: "DockDoorScanning",

View File

@@ -24,11 +24,11 @@ export function AppSidebar() {
}), }),
); );
const { data: canReadWarehouse = false } = useQuery( // const { data: canReadWarehouse = false } = useQuery(
permissionQuery({ // permissionQuery({
warehouse: ["read"], // warehouse: ["read"],
}), // }),
); // );
return ( return (
<Sidebar <Sidebar
@@ -53,8 +53,7 @@ export function AppSidebar() {
{!isLoading && {!isLoading &&
settings.filter((n: any) => n.name === "dockDoorScanning")[0] settings.filter((n: any) => n.name === "dockDoorScanning")[0]
?.active && ?.active && <WarehouseBar />}
canReadWarehouse && <WarehouseBar />}
{session && {session &&
(session.user.role === "admin" || (session.user.role === "admin" ||

View File

@@ -1,68 +1,110 @@
import { useCallback, useEffect, useState } from "react"; import { useCallback, useEffect, useMemo, useState } from "react";
import { toast } from "sonner";
import socket from "@/lib/socket.io"; import socket from "@/lib/socket.io";
type RoomParams = Record<string, unknown>;
type JoinRoomPayload = {
room: string;
params?: RoomParams;
};
type RoomUpdatePayload<T> = { type RoomUpdatePayload<T> = {
roomId: string; roomId: string;
payloads: T[]; payloads: T[];
type: string;
};
type RoomJoinedPayload = {
room: string;
roomId: string;
}; };
type RoomErrorPayload = { type RoomErrorPayload = {
room?: string;
roomId?: string; roomId?: string;
message?: string; message?: string;
}; };
type UpdateMode = "append" | "replace"; export function useSocketRoom<T>(room: string, params?: RoomParams) {
const [actualRoomId, setActualRoomId] = useState<string | null>(null);
export function useSocketRoom<T>(
roomId: string,
getKey?: (item: T) => string | number,
updateMode: UpdateMode = "append",
) {
const [data, setData] = useState<T[]>([]); const [data, setData] = useState<T[]>([]);
const [info, setInfo] = useState( const [info, setInfo] = useState(
"No data yet — join the room to start receiving", "No data yet — join the room to start receiving",
); );
const clearRoom = useCallback( // This is the payload we send to the server.
(id?: string | number) => { // Example:
if (id !== undefined && getKey) { // { room: "inventory", params: { location: "ppoo" } }
setData((prev) => prev.filter((item) => getKey(item) !== id)); const joinPayload = useMemo<JoinRoomPayload>(
setInfo(`Removed item ${id}`); () => ({
return; room,
} params,
console.log("cleared data from the room"); }),
setData([]); [room, params],
setInfo("Room data cleared");
},
[getKey],
); );
const clearRoom = useCallback((filterFn?: (item: T) => boolean) => {
if (filterFn) {
setData((prev) => prev.filter((item) => !filterFn(item)));
return;
}
setData([]);
setInfo("Room data cleared");
}, []);
useEffect(() => { useEffect(() => {
function handleConnect() { // Join the logical room.
socket.emit("join-room", roomId); // The server decides the real Socket.IO roomId.
setInfo(`Joined room: ${roomId}`); // Example:
// client sends: { room: "inventory", params: { location: "ppoo" } }
// server joins: "inventory:ppoo"
function joinRoom() {
socket.emit("join-room", joinPayload);
setInfo(`Joining room: ${room}`);
}
// Server should emit this after socket.join(actualRoom).
// This lets the client know the final roomId to filter updates by.
function handleJoined(payload: RoomJoinedPayload) {
//if (payload.room !== room) return;
setActualRoomId(payload.roomId);
setInfo(`Joined room: ${payload.roomId}`);
} }
function handleUpdate(payload: RoomUpdatePayload<T>) { function handleUpdate(payload: RoomUpdatePayload<T>) {
// protects against other room updates hitting this hook // If we know the actual roomId, only accept updates for that room.
if (payload.roomId !== roomId) return; // This protects against other pages/rooms also listening to "room-update".
// resetting room data for rooms that just need updated data. if (!actualRoomId) return;
if (updateMode === "replace") {
if (payload.roomId !== actualRoomId) return;
if (payload.type === "snapshot") {
setData(payload.payloads); setData(payload.payloads);
} else { return;
setData((prev) => [...payload.payloads, ...prev]);
} }
// Append mode is good for logs/scans/events.
setData((prev) => [...payload.payloads, ...prev]);
setInfo(""); setInfo("");
} }
function handleError(err: RoomErrorPayload) { function handleError(err: RoomErrorPayload) {
if (err.roomId && err.roomId !== roomId) return; // Ignore errors for other logical rooms.
if (err.room && err.room !== room) return;
// Ignore errors for other actual rooms.
if (err.roomId && room && err.roomId !== room) return;
toast.error(err.message);
setInfo(err.message ?? "Room error"); setInfo(err.message ?? "Room error");
} }
socket.on("connect", handleConnect); socket.on("connect", joinRoom);
socket.on("room-joined", handleJoined);
socket.on("room-update", handleUpdate); socket.on("room-update", handleUpdate);
socket.on("room-error", handleError); socket.on("room-error", handleError);
@@ -70,31 +112,26 @@ export function useSocketRoom<T>(
socket.connect(); socket.connect();
} }
// If already connected, join immediately // If socket is already connected, join immediately.
if (socket.connected) { if (socket.connected) {
socket.emit("join-room", roomId); joinRoom();
setInfo(`Joined room: ${roomId}`);
} }
return () => { return () => {
socket.emit("leave-room", roomId); // Leave using the same logical room payload.
// Server should rebuild the actual room and call socket.leave(actualRoom).
socket.emit("leave-room", joinPayload);
socket.off("connect", handleConnect); socket.off("connect", joinRoom);
socket.off("room-joined", handleJoined);
socket.off("room-update", handleUpdate); socket.off("room-update", handleUpdate);
socket.off("room-error", handleError); socket.off("room-error", handleError);
}; };
}, [roomId, updateMode]); }, [room, joinPayload, actualRoomId]);
return { data, info, clearRoom }; return {
data,
info,
clearRoom,
};
} }
/*
const isDockDoorPage = location.pathname.startsWith("/dockdoor");
useSocketRoom(
dockId ? `dockdoor:${dockId}` : null,
isDockDoorPage,
);
*/

View File

@@ -1,6 +1,7 @@
import { createFileRoute, redirect } from "@tanstack/react-router"; import { createFileRoute, redirect } from "@tanstack/react-router";
import { createColumnHelper } from "@tanstack/react-table"; import { createColumnHelper } from "@tanstack/react-table";
import { formatInTimeZone } from "date-fns-tz"; import { formatInTimeZone } from "date-fns-tz";
//import { useMemo } from "react";
import { useSocketRoom } from "@/hooks/socket.io.hook"; import { useSocketRoom } from "@/hooks/socket.io.hook";
import { authClient } from "@/lib/auth-client"; import { authClient } from "@/lib/auth-client";
import { Badge } from "../../components/ui/badge"; import { Badge } from "../../components/ui/badge";
@@ -71,9 +72,10 @@ function LevelBadge({ level }: { level: string }) {
} }
function RouteComponent() { function RouteComponent() {
// const logParams = useMemo(() => ({ subModule: "query" }), []);
// const { data: logs } = useSocketRoom<LogEntry>("logs", logParams);
const { data: logs } = useSocketRoom<LogEntry>("logs"); const { data: logs } = useSocketRoom<LogEntry>("logs");
const columnHelper = createColumnHelper<any>(); const columnHelper = createColumnHelper<any>();
const column = [ const column = [
columnHelper.accessor("createdAt", { columnHelper.accessor("createdAt", {
header: ({ column }) => <SearchableHeader column={column} title="Time" />, header: ({ column }) => <SearchableHeader column={column} title="Time" />,

View File

@@ -4,7 +4,7 @@ import { createColumnHelper } from "@tanstack/react-table";
import { format } from "date-fns-tz"; import { format } from "date-fns-tz";
import { CircleFadingArrowUp, Trash } from "lucide-react"; import { CircleFadingArrowUp, Trash } from "lucide-react";
import { Suspense, useState } from "react"; import { Suspense, useMemo, useState } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import { Button } from "../../components/ui/button"; import { Button } from "../../components/ui/button";
import { Spinner } from "../../components/ui/spinner"; import { Spinner } from "../../components/ui/spinner";
@@ -171,7 +171,13 @@ const ServerTable = () => {
}; };
function RouteComponent() { function RouteComponent() {
const { data: logs = [], clearRoom } = useSocketRoom<any>("admin:build"); const params = useMemo(
() => ({
submodule: "builds",
}),
[],
);
const { data: logs = [], clearRoom } = useSocketRoom<any>("logs", params);
const columnHelper = createColumnHelper<any>(); const columnHelper = createColumnHelper<any>();
@@ -181,7 +187,7 @@ function RouteComponent() {
<SearchableHeader column={column} title="Time" searchable={false} /> <SearchableHeader column={column} title="Time" searchable={false} />
), ),
filterFn: "includesString", filterFn: "includesString",
cell: (i) => format(i.getValue(), "M/d/yyyy HH:mm"), cell: (i) => i.getValue(), //format(i.getValue(), "M/d/yyyy HH:mm") ,
}), }),
columnHelper.accessor("message", { columnHelper.accessor("message", {
header: ({ column }) => ( header: ({ column }) => (
@@ -210,7 +216,7 @@ function RouteComponent() {
<Button <Button
size="icon" size="icon"
variant={"destructive"} variant={"destructive"}
onClick={() => clearRoom(x.timestamp)} onClick={() => clearRoom((item) => item.timestamp === x.timestamp)}
> >
<Trash /> <Trash />
</Button> </Button>

View File

@@ -21,6 +21,7 @@ export const finishLoadingOrder = async (
dockId: string, dockId: string,
refetch: any, refetch: any,
refetchActiveLoading: any, refetchActiveLoading: any,
clear?: boolean,
) => { ) => {
try { try {
const res = await api.post( const res = await api.post(
@@ -28,6 +29,7 @@ export const finishLoadingOrder = async (
{ {
loadingOrder: loadingOrder, loadingOrder: loadingOrder,
dockId: dockId, dockId: dockId,
clear,
}, },
{ validateStatus: (status) => status < 500 }, { validateStatus: (status) => status < 500 },
); );
@@ -80,8 +82,8 @@ function RouteComponent() {
(x: any) => x.id === Number(i.currentLoadingOrder), (x: any) => x.id === Number(i.currentLoadingOrder),
) )
: []; : [];
console.log(loadingPlan); // console.log(loadingPlan);
console.log(loadingPlanItems); // console.log(loadingPlanItems);
return ( return (
<Card <Card
key={i.id} key={i.id}
@@ -150,6 +152,7 @@ function RouteComponent() {
i.dockId, i.dockId,
refetch, refetch,
refetchActiveLoading, refetchActiveLoading,
true,
) )
} }
> >

View File

@@ -2,6 +2,7 @@ import { useQuery, useSuspenseQuery } from "@tanstack/react-query";
import { createFileRoute } from "@tanstack/react-router"; import { createFileRoute } from "@tanstack/react-router";
import { createColumnHelper } from "@tanstack/react-table"; import { createColumnHelper } from "@tanstack/react-table";
import { formatInTimeZone } from "date-fns-tz"; import { formatInTimeZone } from "date-fns-tz";
import { useEffect, useMemo } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import { Button } from "../../../../components/ui/button"; import { Button } from "../../../../components/ui/button";
import { useSocketRoom } from "../../../../hooks/socket.io.hook"; import { useSocketRoom } from "../../../../hooks/socket.io.hook";
@@ -21,11 +22,25 @@ export const Route = createFileRoute(
}); });
function RouteComponent() { function RouteComponent() {
const { dockScans } = Route.useParams(); const { data: canSee = false } = useQuery(
const { data: logs, clearRoom } = useSocketRoom<any>( permissionQuery({
`dockDoorLoading:${dockScans}`, warehouse: ["update"],
}),
); );
const { data, refetch } = useSuspenseQuery(getActiveDockScanners()); const { data, refetch } = useSuspenseQuery(getActiveDockScanners());
const { dockScans } = Route.useParams();
const params = useMemo(
() => ({
dockId: dockScans,
loadingOrder: data[0].currentLoadingOrder ?? undefined,
}),
[dockScans, data],
);
const { data: logs, clearRoom } = useSocketRoom<any>(
`dockDoorLoading`,
params,
);
const { data: loadingPlanItems, refetch: refetchActiveLoading } = const { data: loadingPlanItems, refetch: refetchActiveLoading } =
useSuspenseQuery(getActiveLoadingOrders()); useSuspenseQuery(getActiveLoadingOrders());
@@ -36,6 +51,24 @@ function RouteComponent() {
); );
const columnHelper = createColumnHelper<any>(); const columnHelper = createColumnHelper<any>();
const logCount = logs.length;
// TODO: move this to an onMessage: handFunction
/*
const handleLogMessage = useCallback(() => {
refetchActiveLoading();
}, [refetchActiveLoading]);
const { data: logs } = useSocketRoom<LogEntry>("logs", {
onMessage: handleLogMessage,
});
*/
// biome-ignore lint: false
useEffect(() => {
refetchActiveLoading();
}, [logCount, refetchActiveLoading]);
const column = [ const column = [
columnHelper.accessor("loadingOrder", { columnHelper.accessor("loadingOrder", {
header: ({ column }) => ( header: ({ column }) => (
@@ -147,6 +180,7 @@ function RouteComponent() {
</p> </p>
</div> </div>
<div className="flex flex-col"> <div className="flex flex-col">
{canSee && (
<div> <div>
<form <form
onSubmit={(e) => { onSubmit={(e) => {
@@ -174,6 +208,8 @@ function RouteComponent() {
</div> </div>
</form> </form>
</div> </div>
)}
{loadingPlan && loadingPlan.length > 0 && ( {loadingPlan && loadingPlan.length > 0 && (
<div className="flex mb-2 gap-2"> <div className="flex mb-2 gap-2">
<Button <Button