socket io stuff entered
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
import { Writable } from "node:stream";
|
||||
|
||||
import pino, { type Logger } from "pino";
|
||||
import { db } from "../db/db.controller.js";
|
||||
import { logs } from "../db/schema/logs.schema.js";
|
||||
import { emitToRoom } from "../socket.io/roomEmitter.socket.js";
|
||||
import { tryCatch } from "../utils/trycatch.utils.js";
|
||||
//import build from "pino-abstract-transport";
|
||||
|
||||
@@ -40,6 +42,10 @@ const dbStream = new Writable({
|
||||
console.error(res.error);
|
||||
}
|
||||
|
||||
if (obj.room) {
|
||||
emitToRoom(obj.room, obj);
|
||||
}
|
||||
emitToRoom("logs", obj);
|
||||
callback();
|
||||
} catch (err) {
|
||||
console.error("DB log insert error:", err);
|
||||
@@ -48,31 +54,34 @@ const dbStream = new Writable({
|
||||
},
|
||||
});
|
||||
|
||||
// ✅ Multistream setup
|
||||
const streams = [
|
||||
{
|
||||
stream: pino.transport({
|
||||
target: "pino-pretty",
|
||||
options: {
|
||||
colorize: true,
|
||||
singleLine: true,
|
||||
},
|
||||
}),
|
||||
},
|
||||
{
|
||||
level: "info",
|
||||
stream: dbStream,
|
||||
},
|
||||
];
|
||||
|
||||
const rootLogger: Logger = pino(
|
||||
{
|
||||
level: logLevel,
|
||||
redact: { paths: ["email", "password"], remove: true },
|
||||
},
|
||||
pino.multistream(streams),
|
||||
pino.multistream([
|
||||
{
|
||||
level: logLevel,
|
||||
stream: pino.transport({
|
||||
target: "pino-pretty",
|
||||
options: {
|
||||
colorize: true,
|
||||
singleLine: true,
|
||||
},
|
||||
}),
|
||||
},
|
||||
{
|
||||
level: logLevel,
|
||||
stream: dbStream,
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* example data to put in as a reference
|
||||
* rooms logs | labels | etc
|
||||
*/
|
||||
export const createLogger = (bindings: Record<string, unknown>): Logger => {
|
||||
return rootLogger.child(bindings);
|
||||
};
|
||||
|
||||
@@ -84,7 +84,7 @@ const postRelease = async (release: Releases) => {
|
||||
loadTypeId: process.env.DEFAULT_LOAD_TYPE, // well get this and make it a default one
|
||||
dockId: process.env.DEFAULT_DOCK, // this the warehouse we want it in to start out
|
||||
refNumbers: [release.ReleaseNumber],
|
||||
refNumber: release.ReleaseNumber,
|
||||
//refNumber: release.ReleaseNumber,
|
||||
start: release.DeliveryDate,
|
||||
end: addHours(release.DeliveryDate, 1),
|
||||
notes: "",
|
||||
@@ -202,14 +202,17 @@ const postRelease = async (release: Releases) => {
|
||||
|
||||
log.info({}, `${release.ReleaseNumber} was updated`);
|
||||
} catch (e) {
|
||||
log.error({ error: e }, "Error updating the release");
|
||||
log.error(
|
||||
{ error: e },
|
||||
`Error updating the release: ${release.ReleaseNumber}`,
|
||||
);
|
||||
}
|
||||
// biome-ignore lint/suspicious/noExplicitAny: to many possibilities
|
||||
} catch (e: any) {
|
||||
//console.info(newDockApt);
|
||||
log.error(
|
||||
{ error: e.response.data },
|
||||
"An error has occurred during patching of the release",
|
||||
`An error has occurred during patching of the release: ${release.ReleaseNumber}`,
|
||||
);
|
||||
|
||||
return;
|
||||
|
||||
@@ -15,6 +15,12 @@ import { createCronJob } from "./utils/croner.utils.js";
|
||||
const port = Number(process.env.PORT) || 3000;
|
||||
export let systemSettings: Setting[] = [];
|
||||
const start = async () => {
|
||||
const { app, baseUrl } = await createApp();
|
||||
|
||||
const server = createServer(app);
|
||||
|
||||
setupSocketIORoutes(baseUrl, server);
|
||||
|
||||
const log = createLogger({ module: "system", subModule: "main start" });
|
||||
|
||||
// triggering long lived processes
|
||||
@@ -25,6 +31,7 @@ const start = async () => {
|
||||
systemSettings = await db.select().from(settings);
|
||||
|
||||
//when starting up long lived features the name must match the setting name.
|
||||
// also we always want to have long lived processes inside a setting check.
|
||||
setTimeout(() => {
|
||||
if (systemSettings.filter((n) => n.name === "opendock_sync")[0]?.active) {
|
||||
log.info({}, "Opendock is not active");
|
||||
@@ -35,19 +42,13 @@ const start = async () => {
|
||||
);
|
||||
}
|
||||
|
||||
// cleanup sql jobs
|
||||
// these jobs below are system jobs and should run no matter what.
|
||||
createCronJob("JobAuditLogCleanUp", "0 0 5 * * *", () =>
|
||||
dbCleanup("jobs", 30),
|
||||
);
|
||||
createCronJob("logsCleanup", "0 15 5 * * *", () => dbCleanup("logs", 120));
|
||||
}, 5 * 1000);
|
||||
|
||||
const { app, baseUrl } = await createApp();
|
||||
|
||||
const server = createServer(app);
|
||||
|
||||
setupSocketIORoutes(baseUrl, server);
|
||||
|
||||
server.listen(port, async () => {
|
||||
log.info(
|
||||
`Listening on http://${os.hostname()}:${port}${baseUrl}, logging in ${process.env.LOG_LEVEL}, current ENV ${process.env.NODE_ENV ? process.env.NODE_ENV : "development"}`,
|
||||
|
||||
8
backend/socket.io/roomCache.socket.ts
Normal file
8
backend/socket.io/roomCache.socket.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
import type { RoomId } from "./types.socket.js";
|
||||
|
||||
export const MAX_HISTORY = 20;
|
||||
export const FLUSH_INTERVAL = 100; // 50ms change higher if needed
|
||||
|
||||
export const roomHistory = new Map<RoomId, unknown[]>();
|
||||
export const roomBuffers = new Map<RoomId, any[]>();
|
||||
export const roomFlushTimers = new Map<RoomId, NodeJS.Timeout>();
|
||||
33
backend/socket.io/roomDefinitions.socket.ts
Normal file
33
backend/socket.io/roomDefinitions.socket.ts
Normal file
@@ -0,0 +1,33 @@
|
||||
import { logs } from "backend/db/schema/logs.schema.js";
|
||||
import { desc } from "drizzle-orm";
|
||||
import { db } from "../db/db.controller.js";
|
||||
import type { RoomId } from "./types.socket.js";
|
||||
|
||||
type RoomDefinition<T = unknown> = {
|
||||
seed: (limit: number) => Promise<T[]>;
|
||||
};
|
||||
|
||||
export const roomDefinition: Record<RoomId, RoomDefinition> = {
|
||||
logs: {
|
||||
seed: async (limit) => {
|
||||
try {
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(logs)
|
||||
.orderBy(desc(logs.createdAt))
|
||||
.limit(limit);
|
||||
|
||||
return rows.reverse();
|
||||
} catch (e) {
|
||||
console.error("Failed to seed logs:", e);
|
||||
return [];
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
labels: {
|
||||
seed: async (limit) => {
|
||||
return [];
|
||||
},
|
||||
},
|
||||
};
|
||||
27
backend/socket.io/roomEmitter.socket.ts
Normal file
27
backend/socket.io/roomEmitter.socket.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
// the emitter setup
|
||||
|
||||
import type { RoomId } from "./types.socket.js";
|
||||
|
||||
let addDataToRoom: ((roomId: RoomId, payload: unknown[]) => void) | null = null;
|
||||
|
||||
export const registerEmitter = (
|
||||
fn: (roomId: RoomId, payload: unknown[]) => void,
|
||||
) => {
|
||||
addDataToRoom = fn;
|
||||
};
|
||||
|
||||
export const emitToRoom = (roomId: RoomId, payload: unknown[]) => {
|
||||
if (!addDataToRoom) {
|
||||
console.error("Socket emitter not initialized");
|
||||
return;
|
||||
}
|
||||
|
||||
addDataToRoom(roomId, payload);
|
||||
};
|
||||
|
||||
/*
|
||||
import { emitToRoom } from "../socket/socketEmitter.js";
|
||||
// room name
|
||||
// its payload
|
||||
emitToRoom("logs", newLogRow);
|
||||
*/
|
||||
73
backend/socket.io/roomService.socket.ts
Normal file
73
backend/socket.io/roomService.socket.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import type { Server } from "socket.io";
|
||||
import { createLogger } from "../logger/logger.controller.js";
|
||||
import {
|
||||
FLUSH_INTERVAL,
|
||||
MAX_HISTORY,
|
||||
roomBuffers,
|
||||
roomFlushTimers,
|
||||
roomHistory,
|
||||
} from "./roomCache.socket.js";
|
||||
import { roomDefinition } from "./roomDefinitions.socket.js";
|
||||
import type { RoomId } from "./types.socket.js";
|
||||
|
||||
// get the db data if not exiting already
|
||||
const log = createLogger({ module: "socket.io", subModule: "roomService" });
|
||||
|
||||
export const preseedRoom = async (roomId: RoomId) => {
|
||||
if (roomHistory.has(roomId)) {
|
||||
return roomHistory.get(roomId);
|
||||
}
|
||||
|
||||
const roomDef = roomDefinition[roomId];
|
||||
|
||||
if (!roomDef) {
|
||||
log.error({}, `Room ${roomId} is not defined`);
|
||||
}
|
||||
|
||||
const latestData = await roomDef.seed(MAX_HISTORY);
|
||||
|
||||
roomHistory.set(roomId, latestData);
|
||||
|
||||
return latestData;
|
||||
};
|
||||
|
||||
export const createRoomEmitter = (io: Server) => {
|
||||
const addDataToRoom = <T>(roomId: RoomId, payload: T) => {
|
||||
if (!roomHistory.has(roomId)) {
|
||||
roomHistory.set(roomId, []);
|
||||
}
|
||||
|
||||
const history = roomHistory.get(roomId)!;
|
||||
history?.push(payload);
|
||||
|
||||
if (history?.length > MAX_HISTORY) {
|
||||
history?.shift();
|
||||
}
|
||||
|
||||
if (!roomBuffers.has(roomId)) {
|
||||
roomBuffers.set(roomId, []);
|
||||
}
|
||||
|
||||
roomBuffers.get(roomId)!.push(payload);
|
||||
|
||||
if (!roomFlushTimers.has(roomId)) {
|
||||
const timer = setTimeout(() => {
|
||||
const buffered = roomBuffers.get(roomId) || [];
|
||||
|
||||
if (buffered.length > 0) {
|
||||
io.to(roomId).emit("room-update", {
|
||||
roomId,
|
||||
payloads: buffered, // ✅ array now
|
||||
});
|
||||
}
|
||||
|
||||
roomBuffers.set(roomId, []);
|
||||
roomFlushTimers.delete(roomId);
|
||||
}, FLUSH_INTERVAL);
|
||||
|
||||
roomFlushTimers.set(roomId, timer);
|
||||
}
|
||||
};
|
||||
|
||||
return { addDataToRoom };
|
||||
};
|
||||
@@ -3,9 +3,13 @@ import type { Server as HttpServer } from "node:http";
|
||||
//import { fileURLToPath } from "node:url";
|
||||
import { instrument } from "@socket.io/admin-ui";
|
||||
import { Server } from "socket.io";
|
||||
import { createLogger } from "../logger/logger.controller.js";
|
||||
import { registerEmitter } from "./roomEmitter.socket.js";
|
||||
import { createRoomEmitter, preseedRoom } from "./roomService.socket.js";
|
||||
|
||||
//const __filename = fileURLToPath(import.meta.url);
|
||||
//const __dirname = dirname(__filename);
|
||||
const log = createLogger({ module: "socket.io", subModule: "setup" });
|
||||
|
||||
export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
|
||||
const io = new Server(server, {
|
||||
@@ -16,29 +20,49 @@ export const setupSocketIORoutes = (baseUrl: string, server: HttpServer) => {
|
||||
},
|
||||
});
|
||||
|
||||
// ✅ Create emitter instance
|
||||
const { addDataToRoom } = createRoomEmitter(io);
|
||||
registerEmitter(addDataToRoom);
|
||||
|
||||
io.on("connection", (s) => {
|
||||
console.info(s.id);
|
||||
log.info({}, `User connected: ${s.id}`);
|
||||
|
||||
s.emit("welcome", {
|
||||
serverTime: Date.now(),
|
||||
availableRooms: ["logs", "labels"],
|
||||
version: "1.0.0",
|
||||
});
|
||||
|
||||
s.on("join-room", async (rn) => {
|
||||
s.join(rn);
|
||||
|
||||
// get room seeded
|
||||
const history = await preseedRoom(rn);
|
||||
|
||||
// send the intial data
|
||||
s.emit("room-update", {
|
||||
roomId: rn,
|
||||
payloads: history,
|
||||
initial: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// admin stuff for socket io
|
||||
// app.use(
|
||||
// express.static(
|
||||
// join(__dirname, "../../../node_modules/@socket.io/admin-ui/dist"),
|
||||
// ),
|
||||
// );
|
||||
io.on("disconnect", (s) => {
|
||||
log.info({}, "User disconnected:", s.id);
|
||||
});
|
||||
|
||||
// admin stuff
|
||||
|
||||
// app.get(baseUrl + "/admindashboard", (_, res) => {
|
||||
// res.sendFile(
|
||||
// join(
|
||||
// __dirname,
|
||||
// "../../../node_modules/@socket.io/admin-ui/dist/index.js",
|
||||
// ),
|
||||
// );
|
||||
// });
|
||||
const admin = io.of("/admin");
|
||||
admin.on("connection", () => {
|
||||
console.info("Connected to admin userspace");
|
||||
admin.on("connection", (s) => {
|
||||
log.info({}, `User connected: ${s.id}`);
|
||||
});
|
||||
|
||||
admin.on("disconnect", (s) => {
|
||||
log.info({}, "User disconnected:", s.id);
|
||||
});
|
||||
|
||||
instrument(io, {
|
||||
auth: false,
|
||||
//namespaceName: "/admin",
|
||||
|
||||
1
backend/socket.io/types.socket.ts
Normal file
1
backend/socket.io/types.socket.ts
Normal file
@@ -0,0 +1 @@
|
||||
export type RoomId = "logs" | "labels"; //| "alerts" | "metrics";
|
||||
@@ -24,6 +24,7 @@ export const featureControl = async (data: Setting) => {
|
||||
stopCronJob(data.name);
|
||||
}
|
||||
|
||||
// specific setting stuff should have handled like below. what needs turned back on or off.
|
||||
if (data.name === "opendock_sync" && data.active) {
|
||||
opendockSocketMonitor();
|
||||
monitorReleaseChanges();
|
||||
|
||||
@@ -18,6 +18,7 @@ interface Data<T = unknown[]> {
|
||||
| "settings";
|
||||
level: "info" | "error" | "debug" | "fatal";
|
||||
message: string;
|
||||
room?: string;
|
||||
data?: T;
|
||||
notify?: boolean;
|
||||
}
|
||||
@@ -38,20 +39,21 @@ interface Data<T = unknown[]> {
|
||||
*/
|
||||
export const returnFunc = (data: Data) => {
|
||||
const notify = data.notify ? data.notify : false;
|
||||
const room = data.room ?? data.room;
|
||||
const log = createLogger({ module: data.module, subModule: data.subModule });
|
||||
// handle the logging part
|
||||
switch (data.level) {
|
||||
case "info":
|
||||
log.info({ notify: notify }, data.message);
|
||||
log.info({ notify: notify, room }, data.message);
|
||||
break;
|
||||
case "error":
|
||||
log.error({ notify: notify, error: data.data }, data.message);
|
||||
log.error({ notify: notify, error: data.data, room }, data.message);
|
||||
break;
|
||||
case "debug":
|
||||
log.debug({ notify: notify }, data.message);
|
||||
log.debug({ notify: notify, room }, data.message);
|
||||
break;
|
||||
case "fatal":
|
||||
log.fatal({ notify: notify }, data.message);
|
||||
log.fatal({ notify: notify, room }, data.message);
|
||||
}
|
||||
|
||||
// api section to return
|
||||
|
||||
Reference in New Issue
Block a user