feat(notifications): migrated all from v1
This commit is contained in:
158
server/services/notifications/utils/processNotifications.ts
Normal file
158
server/services/notifications/utils/processNotifications.ts
Normal file
@@ -0,0 +1,158 @@
|
||||
import { db } from "../../../../database/dbclient.js";
|
||||
import { notifications } from "../../../../database/schema/notifications.js";
|
||||
import { tryCatch } from "../../../globalUtils/tryCatch.js";
|
||||
import { createLog } from "../../logger/logger.js";
|
||||
import { Cron } from "croner";
|
||||
|
||||
// Store active timeouts by notification ID
|
||||
export let runningNotifications: Record<string, Cron> = {};
|
||||
|
||||
export const startNotificationMonitor = async () => {
|
||||
// if restarted or crashed we need to make sure the running notifications is cleared
|
||||
createLog("info", "notify", "notify", `Notification system is now active.`);
|
||||
|
||||
setInterval(async () => {
|
||||
const { data, error } = await tryCatch(db.select().from(notifications));
|
||||
|
||||
if (error) {
|
||||
createLog(
|
||||
"error",
|
||||
"notify",
|
||||
"notify",
|
||||
`There was an error getting the notifications: ${JSON.stringify(
|
||||
error
|
||||
)}`
|
||||
);
|
||||
}
|
||||
|
||||
const notes: any = data;
|
||||
|
||||
for (const note of notes) {
|
||||
//if we get deactivated remove it.
|
||||
if (runningNotifications[note.name] && !note.active) {
|
||||
createLog(
|
||||
"info",
|
||||
"notify",
|
||||
"notify",
|
||||
`${note.name} was just deactivated`
|
||||
);
|
||||
removeNotification(note.name);
|
||||
}
|
||||
|
||||
// if we are not active, no emails, and already in place just stop.
|
||||
|
||||
if (
|
||||
!note.active ||
|
||||
note.emails === "" ||
|
||||
runningNotifications[note.name]
|
||||
) {
|
||||
//console.log(`Skipping ${note.name} hes already scheduled`);
|
||||
continue;
|
||||
}
|
||||
|
||||
let time = `*/30 * * * *`; // default to be every 30 min
|
||||
|
||||
if (note.timeType === "min") {
|
||||
console.log(`Creating the min mark here`);
|
||||
time = `*/${note.checkInterval} * * * *`;
|
||||
}
|
||||
|
||||
if (note.timeType === "hour") {
|
||||
console.log(`Creating the hour mark here`);
|
||||
time = `* */${note.checkInterval} * * *`;
|
||||
}
|
||||
|
||||
createJob(note.name, time, async () => {
|
||||
try {
|
||||
const { default: runFun } = await import(
|
||||
`../controller/notifications/${note.name}.js`
|
||||
);
|
||||
await runFun(note);
|
||||
} catch (error: any) {
|
||||
createLog(
|
||||
"error",
|
||||
"notify",
|
||||
note.name,
|
||||
`Error running notification: ${error.message}`
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
//testParse(runningNotifcations[note.name]);
|
||||
}
|
||||
}, 5 * 1000);
|
||||
};
|
||||
|
||||
const createJob = (id: string, schedule: string, task: () => Promise<void>) => {
|
||||
// Destroy existing job if it exists
|
||||
if (runningNotifications[id]) {
|
||||
runningNotifications[id].stop(); // Croner uses .stop() instead of .destroy()
|
||||
}
|
||||
|
||||
// Create new job with Croner
|
||||
runningNotifications[id] = new Cron(
|
||||
schedule,
|
||||
{
|
||||
timezone: "America/Chicago",
|
||||
catch: true, // Prevents unhandled rejections
|
||||
},
|
||||
task
|
||||
);
|
||||
|
||||
// Optional: Add error handling (Croner emits 'error' events)
|
||||
// runningNotifications[id].on("error", (err) => {
|
||||
// console.error(`Job ${id} failed:`, err);
|
||||
// });
|
||||
};
|
||||
|
||||
interface JobInfo {
|
||||
id: string;
|
||||
schedule: string;
|
||||
nextRun: Date | null;
|
||||
isRunning: boolean;
|
||||
}
|
||||
|
||||
export const getAllJobs = (): JobInfo[] => {
|
||||
return Object.entries(runningNotifications).map(([id, job]) => ({
|
||||
id,
|
||||
schedule: job.getPattern() || "invalid",
|
||||
nextRun: job.nextRun() || null,
|
||||
lastRun: job.previousRun() || null,
|
||||
isRunning: job ? !job.isStopped() : false,
|
||||
}));
|
||||
};
|
||||
|
||||
const removeNotification = (id: any) => {
|
||||
if (runningNotifications[id]) {
|
||||
runningNotifications[id].stop();
|
||||
delete runningNotifications[id];
|
||||
}
|
||||
};
|
||||
|
||||
export const stopAllJobs = () => {
|
||||
Object.values(runningNotifications).forEach((job: any) => job.stop());
|
||||
runningNotifications = {}; // Clear the object
|
||||
};
|
||||
|
||||
/*
|
||||
// Pause a job
|
||||
app.post("/api/jobs/:id/pause", (req, res) => {
|
||||
runningNotifications[req.params.id]?.pause();
|
||||
res.json({ success: true });
|
||||
});
|
||||
|
||||
// Resume a job
|
||||
app.post("/api/jobs/:id/resume", (req, res) => {
|
||||
runningNotifications[req.params.id]?.resume();
|
||||
res.json({ success: true });
|
||||
});
|
||||
|
||||
// Delete a job
|
||||
app.delete("/api/jobs/:id", (req, res) => {
|
||||
runningNotifications[req.params.id]?.stop();
|
||||
delete runningNotifications[req.params.id];
|
||||
res.json({ success: true });
|
||||
});
|
||||
|
||||
|
||||
*/
|
||||
Reference in New Issue
Block a user