Files
lstV2/server/services/notifications/utils/processNotifications.ts

173 lines
5.4 KiB
TypeScript

import { db } from "../../../../database/dbclient.js";
import { notifications } from "../../../../database/schema/notifications.js";
import { tryCatch } from "../../../globalUtils/tryCatch.js";
import type { JobInfo } from "../../../types/JobInfo.js";
import { createLog } from "../../logger/logger.js";
import { Cron } from "croner";
// Store active timeouts by notification ID
export let runningCrons: Record<string, Cron> = {};
export const startNotificationMonitor = async () => {
// if restarted or crashed we need to make sure the running notifications is cleared
createLog("info", "notify", "notify", `Notification system is now active.`);
setInterval(async () => {
const { data, error } = await tryCatch(db.select().from(notifications));
if (error) {
createLog(
"error",
"notify",
"notify",
`There was an error getting the notifications: ${JSON.stringify(
error
)}`
);
}
const notes: any = data;
for (const note of notes) {
//if we get deactivated remove it.
if (runningCrons[note.name] && !note.active) {
createLog(
"info",
"notify",
"notify",
`${note.name} was just deactivated`
);
removeNotification(note.name);
}
// if we are not active, no emails, and already in place just stop.
if (
!note.active ||
// note.emails === "" ||
runningCrons[note.name]
) {
continue;
}
if (!runningCrons[note.name] && note.active) {
createLog(
"info",
"notify",
"notify",
`${note.name} Is active and not already running.`
);
}
let time = `*/30 * * * *`; // default to be every 30 min
if (note.timeType === "min") {
//console.log(`Creating the min mark here`);
const totalMinutes = note.checkInterval;
if (note.checkInterval > 60) {
const hours = Math.floor(totalMinutes / 60); // 1 hour
const minutes = totalMinutes % 60; // 45 minutes
time = `*/${minutes} */${hours} * * *`;
} else {
time = `*/${note.checkInterval} * * * *`;
}
}
if (note.timeType === "hour") {
const totalHours = note.checkInterval;
if (note.checkInterval > 60) {
const days = Math.floor(totalHours / 24); // 1 hour
const hours = totalHours % 24; // 45 minutes
time = `* */${hours} */${days} * *`;
} else {
time = `* */${note.checkInterval} * * *`;
}
}
createJob(note.name, time, async () => {
try {
const { default: runFun } = await import(
`../controller/notifications/${note.name}.js`
);
await runFun(note);
} catch (error: any) {
createLog(
"error",
"notify",
note.name,
`Error running notification: ${error.message}`
);
}
});
//testParse(runningNotifcations[note.name]);
}
}, 5 * 1000);
};
const createJob = (id: string, schedule: string, task: () => Promise<void>) => {
// Destroy existing job if it exists
if (runningCrons[id]) {
runningCrons[id].stop(); // Croner uses .stop() instead of .destroy()
}
// Create new job with Croner
runningCrons[id] = new Cron(
schedule,
{
timezone: "America/Chicago",
catch: true, // Prevents unhandled rejections
},
task
);
// Optional: Add error handling (Croner emits 'error' events)
// runningNotifications[id].on("error", (err) => {
// console.error(`Job ${id} failed:`, err);
// });
};
export const getAllJobs = (): JobInfo[] => {
return Object.entries(runningCrons).map(([id, job]) => ({
id,
schedule: job.getPattern() || "invalid",
nextRun: job.nextRun() || null,
lastRun: job.previousRun() || null,
isRunning: job ? !job.isStopped() : false,
}));
};
const removeNotification = (id: any) => {
if (runningCrons[id]) {
runningCrons[id].stop();
delete runningCrons[id];
}
};
export const stopAllJobs = () => {
Object.values(runningCrons).forEach((job: any) => job.stop());
runningCrons = {}; // Clear the object
};
/*
// Pause a job
app.post("/api/jobs/:id/pause", (req, res) => {
runningNotifications[req.params.id]?.pause();
res.json({ success: true });
});
// Resume a job
app.post("/api/jobs/:id/resume", (req, res) => {
runningNotifications[req.params.id]?.resume();
res.json({ success: true });
});
// Delete a job
app.delete("/api/jobs/:id", (req, res) => {
runningNotifications[req.params.id]?.stop();
delete runningNotifications[req.params.id];
res.json({ success: true });
});
*/