refactor(db): added in notifications vs pulling from the db makes it easier on the system
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
import { drizzle } from "drizzle-orm/postgres-js";
|
||||
import postgres from "postgres";
|
||||
import * as dockScans from "./schema/dockdoor.scans.schema.js";
|
||||
import * as logs from "./schema/logs.schema.js";
|
||||
import * as opendockAVCheck from "./schema/opendock_articleSetup.js";
|
||||
import * as scanUserSchema from "./schema/scanUsers.js";
|
||||
import * as settingsSchema from "./schema/settings.schema.js";
|
||||
@@ -23,5 +25,7 @@ export const db = drizzle(queryClient, {
|
||||
...scanUserSchema,
|
||||
...settingsSchema,
|
||||
...opendockAVCheck,
|
||||
...logs,
|
||||
...dockScans,
|
||||
},
|
||||
});
|
||||
|
||||
59
backend/db/db.listener.ts
Normal file
59
backend/db/db.listener.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import postgres from "postgres";
|
||||
import { createLogger } from "../logger/logger.controller.js";
|
||||
import { handleDbNotification } from "./db.router.js";
|
||||
|
||||
const log = createLogger({
|
||||
module: "db",
|
||||
subModule: "notifications",
|
||||
});
|
||||
|
||||
const CHANNELS = [
|
||||
"logs_inserted",
|
||||
// "labels_inserted",
|
||||
// "dock_scans_inserted",
|
||||
] as const;
|
||||
|
||||
type DbNotificationChannel = (typeof CHANNELS)[number];
|
||||
|
||||
type DbNotificationPayload = {
|
||||
table: string;
|
||||
action: "INSERT" | "UPDATE" | "DELETE";
|
||||
id: string;
|
||||
};
|
||||
|
||||
export async function startDbNotificationListener() {
|
||||
const sql = postgres({
|
||||
host: `${process.env.DATABASE_HOST}`,
|
||||
port: Number(process.env.DATABASE_PORT),
|
||||
database: `${process.env.DATABASE_DB}`,
|
||||
username: process.env.DATABASE_USER,
|
||||
password: process.env.DATABASE_PASSWORD,
|
||||
});
|
||||
|
||||
for (const channel of CHANNELS) {
|
||||
await sql.listen(channel, async (rawPayload) => {
|
||||
await processNotification(channel, rawPayload);
|
||||
});
|
||||
|
||||
log.info({ stack: { channel } }, `Listening for ${channel}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function processNotification(
|
||||
channel: DbNotificationChannel,
|
||||
rawPayload: string,
|
||||
) {
|
||||
try {
|
||||
const payload = JSON.parse(rawPayload) as DbNotificationPayload;
|
||||
|
||||
await handleDbNotification({
|
||||
channel,
|
||||
payload,
|
||||
});
|
||||
} catch (e) {
|
||||
log.error(
|
||||
{ stack: { channel, rawPayload, e }, notify: true },
|
||||
"Failed processing DB notification",
|
||||
);
|
||||
}
|
||||
}
|
||||
41
backend/db/db.router.ts
Normal file
41
backend/db/db.router.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { handleDockScanInsertedNotification } from "../dockdoorScanning/dockdoor.socket.notifications.js";
|
||||
import { createLogger } from "../logger/logger.controller.js";
|
||||
import { handleLogInsertedNotification } from "../logger/logger.socket.notifications.js";
|
||||
|
||||
const log = createLogger({
|
||||
module: "db",
|
||||
subModule: "notifications-router",
|
||||
});
|
||||
|
||||
type DbNotification = {
|
||||
channel: string;
|
||||
payload: {
|
||||
table: string;
|
||||
action: "INSERT" | "UPDATE" | "DELETE";
|
||||
id: string;
|
||||
};
|
||||
};
|
||||
|
||||
export async function handleDbNotification(notification: DbNotification) {
|
||||
const { channel, payload } = notification;
|
||||
|
||||
switch (channel) {
|
||||
case "logs_inserted":
|
||||
await handleLogInsertedNotification(payload.id);
|
||||
return;
|
||||
|
||||
// case "labels_inserted":
|
||||
// await handleLabelInsertedNotification(payload.id);
|
||||
// return;
|
||||
|
||||
case "dock_scan_inserted":
|
||||
await handleDockScanInsertedNotification(payload.id);
|
||||
return;
|
||||
|
||||
default:
|
||||
log.warn(
|
||||
{ stack: notification },
|
||||
`Unhandled DB notification channel: ${channel}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
99
backend/db/db.setupNotifications.ts
Normal file
99
backend/db/db.setupNotifications.ts
Normal file
@@ -0,0 +1,99 @@
|
||||
import { sql } from "drizzle-orm";
|
||||
import { db } from "../db/db.controller.js";
|
||||
import { createLogger } from "../logger/logger.controller.js";
|
||||
|
||||
const log = createLogger({
|
||||
module: "db",
|
||||
subModule: "notifications",
|
||||
});
|
||||
|
||||
/**
|
||||
* Creates/updates Postgres notification functions + triggers.
|
||||
*
|
||||
* Safe to run on every app startup.
|
||||
* CREATE OR REPLACE updates the function.
|
||||
* DROP TRIGGER IF EXISTS prevents duplicate triggers.
|
||||
*/
|
||||
export async function setupDbNotifications() {
|
||||
log.info({}, "Setting up DB notifications");
|
||||
|
||||
await setupLogsNotifications();
|
||||
await setupDockScansNotifications();
|
||||
|
||||
log.info({}, "DB notifications setup complete");
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs notification setup.
|
||||
*
|
||||
* Flow:
|
||||
* 1. app inserts into logs table
|
||||
* 2. trigger runs after insert
|
||||
* 3. Postgres sends NOTIFY logs_inserted with the new log id
|
||||
* 4. Node listener receives id and fetches/emits full row
|
||||
*/
|
||||
async function setupLogsNotifications() {
|
||||
await db.execute(sql`
|
||||
CREATE OR REPLACE FUNCTION notify_logs_inserted()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify(
|
||||
'logs_inserted',
|
||||
json_build_object(
|
||||
'table', TG_TABLE_NAME,
|
||||
'action', TG_OP,
|
||||
'id', NEW.id
|
||||
)::text
|
||||
);
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
`);
|
||||
|
||||
await db.execute(sql`
|
||||
DROP TRIGGER IF EXISTS logs_inserted_notify_trigger ON logs;
|
||||
`);
|
||||
|
||||
await db.execute(sql`
|
||||
CREATE TRIGGER logs_inserted_notify_trigger
|
||||
AFTER INSERT ON logs
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION notify_logs_inserted();
|
||||
`);
|
||||
|
||||
log.info({}, "Logs DB notification trigger ready");
|
||||
}
|
||||
|
||||
async function setupDockScansNotifications() {
|
||||
await db.execute(sql`
|
||||
CREATE OR REPLACE FUNCTION notify_dock_scan_inserted()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify(
|
||||
'dock_scan_inserted',
|
||||
json_build_object(
|
||||
'table', TG_TABLE_NAME,
|
||||
'action', TG_OP,
|
||||
'id', NEW.id
|
||||
)::text
|
||||
);
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
`);
|
||||
|
||||
await db.execute(sql`
|
||||
DROP TRIGGER IF EXISTS dock_scan_inserted_notify_trigger ON dock_door_scans;
|
||||
`);
|
||||
|
||||
await db.execute(sql`
|
||||
CREATE TRIGGER dock_scan_inserted_notify_trigger
|
||||
AFTER INSERT ON dock_door_scans
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION notify_dock_scan_inserted();
|
||||
`);
|
||||
|
||||
log.info({}, "Dock scan DB notification trigger ready");
|
||||
}
|
||||
21
backend/db/db.socketSeed.ts
Normal file
21
backend/db/db.socketSeed.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import { db } from "./db.controller.js";
|
||||
|
||||
export const getRecentLogs = ({
|
||||
module,
|
||||
submodule,
|
||||
limit = 200,
|
||||
}: {
|
||||
module?: string | undefined;
|
||||
submodule?: string | undefined;
|
||||
limit?: number | undefined;
|
||||
}) => {
|
||||
return db.query.logs.findMany({
|
||||
where: (logs, { and, eq }) =>
|
||||
and(
|
||||
module ? eq(logs.module, module) : undefined,
|
||||
submodule ? eq(logs.subModule, submodule) : undefined,
|
||||
),
|
||||
orderBy: (logs, { desc }) => [desc(logs.createdAt)],
|
||||
limit,
|
||||
});
|
||||
};
|
||||
@@ -2,6 +2,8 @@ import { createServer } from "node:http";
|
||||
import os from "node:os";
|
||||
import createApp from "./app.js";
|
||||
import { db } from "./db/db.controller.js";
|
||||
import { startDbNotificationListener } from "./db/db.listener.js";
|
||||
import { setupDbNotifications } from "./db/db.setupNotifications.js";
|
||||
import { dbCleanup } from "./db/dbCleanup.controller.js";
|
||||
import { type Setting, settings } from "./db/schema/settings.schema.js";
|
||||
import { connectGPSql } from "./gpSql/gpSqlConnection.controller.js";
|
||||
@@ -43,6 +45,7 @@ const start = async () => {
|
||||
startTCPServer();
|
||||
connectProdSql();
|
||||
connectGPSql();
|
||||
startDbNotificationListener();
|
||||
|
||||
// trigger startup processes these must run before anything else can run
|
||||
await baseSettingValidationCheck();
|
||||
@@ -89,6 +92,7 @@ const start = async () => {
|
||||
startNotifications();
|
||||
serversChecks();
|
||||
aggregateRouteHitsForBusinessDay();
|
||||
setupDbNotifications();
|
||||
|
||||
// can be removed at a later date
|
||||
sqlJobCleanUp();
|
||||
|
||||
Reference in New Issue
Block a user