diff --git a/backend/db/db.controller.ts b/backend/db/db.controller.ts index df51e78..146e871 100644 --- a/backend/db/db.controller.ts +++ b/backend/db/db.controller.ts @@ -1,5 +1,7 @@ import { drizzle } from "drizzle-orm/postgres-js"; import postgres from "postgres"; +import * as dockScans from "./schema/dockdoor.scans.schema.js"; +import * as logs from "./schema/logs.schema.js"; import * as opendockAVCheck from "./schema/opendock_articleSetup.js"; import * as scanUserSchema from "./schema/scanUsers.js"; import * as settingsSchema from "./schema/settings.schema.js"; @@ -23,5 +25,7 @@ export const db = drizzle(queryClient, { ...scanUserSchema, ...settingsSchema, ...opendockAVCheck, + ...logs, + ...dockScans, }, }); diff --git a/backend/db/db.listener.ts b/backend/db/db.listener.ts new file mode 100644 index 0000000..5b27690 --- /dev/null +++ b/backend/db/db.listener.ts @@ -0,0 +1,59 @@ +import postgres from "postgres"; +import { createLogger } from "../logger/logger.controller.js"; +import { handleDbNotification } from "./db.router.js"; + +const log = createLogger({ + module: "db", + subModule: "notifications", +}); + +const CHANNELS = [ + "logs_inserted", + // "labels_inserted", + // "dock_scans_inserted", +] as const; + +type DbNotificationChannel = (typeof CHANNELS)[number]; + +type DbNotificationPayload = { + table: string; + action: "INSERT" | "UPDATE" | "DELETE"; + id: string; +}; + +export async function startDbNotificationListener() { + const sql = postgres({ + host: `${process.env.DATABASE_HOST}`, + port: Number(process.env.DATABASE_PORT), + database: `${process.env.DATABASE_DB}`, + username: process.env.DATABASE_USER, + password: process.env.DATABASE_PASSWORD, + }); + + for (const channel of CHANNELS) { + await sql.listen(channel, async (rawPayload) => { + await processNotification(channel, rawPayload); + }); + + log.info({ stack: { channel } }, `Listening for ${channel}`); + } +} + +async function processNotification( + channel: DbNotificationChannel, + rawPayload: string, +) { + try { + const payload = JSON.parse(rawPayload) as DbNotificationPayload; + + await handleDbNotification({ + channel, + payload, + }); + } catch (e) { + log.error( + { stack: { channel, rawPayload, e }, notify: true }, + "Failed processing DB notification", + ); + } +} diff --git a/backend/db/db.router.ts b/backend/db/db.router.ts new file mode 100644 index 0000000..17acc77 --- /dev/null +++ b/backend/db/db.router.ts @@ -0,0 +1,41 @@ +import { handleDockScanInsertedNotification } from "../dockdoorScanning/dockdoor.socket.notifications.js"; +import { createLogger } from "../logger/logger.controller.js"; +import { handleLogInsertedNotification } from "../logger/logger.socket.notifications.js"; + +const log = createLogger({ + module: "db", + subModule: "notifications-router", +}); + +type DbNotification = { + channel: string; + payload: { + table: string; + action: "INSERT" | "UPDATE" | "DELETE"; + id: string; + }; +}; + +export async function handleDbNotification(notification: DbNotification) { + const { channel, payload } = notification; + + switch (channel) { + case "logs_inserted": + await handleLogInsertedNotification(payload.id); + return; + + // case "labels_inserted": + // await handleLabelInsertedNotification(payload.id); + // return; + + case "dock_scan_inserted": + await handleDockScanInsertedNotification(payload.id); + return; + + default: + log.warn( + { stack: notification }, + `Unhandled DB notification channel: ${channel}`, + ); + } +} diff --git a/backend/db/db.setupNotifications.ts b/backend/db/db.setupNotifications.ts new file mode 100644 index 0000000..035499e --- /dev/null +++ b/backend/db/db.setupNotifications.ts @@ -0,0 +1,99 @@ +import { sql } from "drizzle-orm"; +import { db } from "../db/db.controller.js"; +import { createLogger } from "../logger/logger.controller.js"; + +const log = createLogger({ + module: "db", + subModule: "notifications", +}); + +/** + * Creates/updates Postgres notification functions + triggers. + * + * Safe to run on every app startup. + * CREATE OR REPLACE updates the function. + * DROP TRIGGER IF EXISTS prevents duplicate triggers. + */ +export async function setupDbNotifications() { + log.info({}, "Setting up DB notifications"); + + await setupLogsNotifications(); + await setupDockScansNotifications(); + + log.info({}, "DB notifications setup complete"); +} + +/** + * Logs notification setup. + * + * Flow: + * 1. app inserts into logs table + * 2. trigger runs after insert + * 3. Postgres sends NOTIFY logs_inserted with the new log id + * 4. Node listener receives id and fetches/emits full row + */ +async function setupLogsNotifications() { + await db.execute(sql` + CREATE OR REPLACE FUNCTION notify_logs_inserted() + RETURNS trigger AS $$ + BEGIN + PERFORM pg_notify( + 'logs_inserted', + json_build_object( + 'table', TG_TABLE_NAME, + 'action', TG_OP, + 'id', NEW.id + )::text + ); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `); + + await db.execute(sql` + DROP TRIGGER IF EXISTS logs_inserted_notify_trigger ON logs; + `); + + await db.execute(sql` + CREATE TRIGGER logs_inserted_notify_trigger + AFTER INSERT ON logs + FOR EACH ROW + EXECUTE FUNCTION notify_logs_inserted(); + `); + + log.info({}, "Logs DB notification trigger ready"); +} + +async function setupDockScansNotifications() { + await db.execute(sql` + CREATE OR REPLACE FUNCTION notify_dock_scan_inserted() + RETURNS trigger AS $$ + BEGIN + PERFORM pg_notify( + 'dock_scan_inserted', + json_build_object( + 'table', TG_TABLE_NAME, + 'action', TG_OP, + 'id', NEW.id + )::text + ); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `); + + await db.execute(sql` + DROP TRIGGER IF EXISTS dock_scan_inserted_notify_trigger ON dock_door_scans; + `); + + await db.execute(sql` + CREATE TRIGGER dock_scan_inserted_notify_trigger + AFTER INSERT ON dock_door_scans + FOR EACH ROW + EXECUTE FUNCTION notify_dock_scan_inserted(); + `); + + log.info({}, "Dock scan DB notification trigger ready"); +} diff --git a/backend/db/db.socketSeed.ts b/backend/db/db.socketSeed.ts new file mode 100644 index 0000000..0be269f --- /dev/null +++ b/backend/db/db.socketSeed.ts @@ -0,0 +1,21 @@ +import { db } from "./db.controller.js"; + +export const getRecentLogs = ({ + module, + submodule, + limit = 200, +}: { + module?: string | undefined; + submodule?: string | undefined; + limit?: number | undefined; +}) => { + return db.query.logs.findMany({ + where: (logs, { and, eq }) => + and( + module ? eq(logs.module, module) : undefined, + submodule ? eq(logs.subModule, submodule) : undefined, + ), + orderBy: (logs, { desc }) => [desc(logs.createdAt)], + limit, + }); +}; diff --git a/backend/server.ts b/backend/server.ts index 30fa3bf..0f7f4ce 100644 --- a/backend/server.ts +++ b/backend/server.ts @@ -2,6 +2,8 @@ import { createServer } from "node:http"; import os from "node:os"; import createApp from "./app.js"; import { db } from "./db/db.controller.js"; +import { startDbNotificationListener } from "./db/db.listener.js"; +import { setupDbNotifications } from "./db/db.setupNotifications.js"; import { dbCleanup } from "./db/dbCleanup.controller.js"; import { type Setting, settings } from "./db/schema/settings.schema.js"; import { connectGPSql } from "./gpSql/gpSqlConnection.controller.js"; @@ -43,12 +45,13 @@ const start = async () => { startTCPServer(); connectProdSql(); connectGPSql(); + startDbNotificationListener(); // trigger startup processes these must run before anything else can run await baseSettingValidationCheck(); systemSettings = await db.select().from(settings); - //when starting up long lived features the name must match the setting name. + // when starting up long lived features the name must match the setting name. // also we always want to have long lived processes inside a setting check. setTimeout(() => { if (systemSettings.filter((n) => n.name === "opendock_sync")[0]?.active) { @@ -89,6 +92,7 @@ const start = async () => { startNotifications(); serversChecks(); aggregateRouteHitsForBusinessDay(); + setupDbNotifications(); // can be removed at a later date sqlJobCleanUp();