refactor(datamart): more work on getting this to be a more dynamic/sync system

This commit is contained in:
2026-01-29 15:09:30 -06:00
parent 81bd4d6dcb
commit 31f8c368d9
11 changed files with 245 additions and 31 deletions

View File

@@ -1,6 +1,7 @@
import { createServer } from "node:http";
import os from "node:os";
import createApp from "./app.js";
import { startDatamartSync } from "./src/datamart/datamartSync.controller.js";
import { createLogger } from "./src/logger/logger.controller.js";
import { connectProdSql } from "./src/prodSql/prodSqlConnection.controller.js";
import { setupSocketIORoutes } from "./src/socket.io/serverSetup.js";
@@ -10,7 +11,9 @@ const port = Number(process.env.PORT) || 3000;
const start = async () => {
const log = createLogger({ module: "system", subModule: "main start" });
// triggering long lived processes
connectProdSql();
startDatamartSync();
const { app, baseUrl } = await createApp();
@@ -20,7 +23,7 @@ const start = async () => {
server.listen(port, async () => {
log.info(
`Listening on http://${os.hostname()}:${port}${baseUrl}, logging in ${process.env.LOG_LEVEL}`,
`Listening on http://${os.hostname()}:${port}${baseUrl}, logging in ${process.env.LOG_LEVEL}, current ENV ${process.env.NODE_ENV ? process.env.NODE_ENV : "development"}`,
);
});
};

View File

@@ -1,4 +1,4 @@
import { eq } from "drizzle-orm";
import { and, eq, gte, sql } from "drizzle-orm";
import type { Express } from "express";
import { db } from "../db/db.controller.js";
import { datamart } from "../db/schema/datamart.schema.js";
@@ -8,6 +8,30 @@ import updateQuery from "./datamartUpdate.route.js";
import runQuery from "./getDatamart.route.js";
export const setupDatamartRoutes = (baseUrl: string, app: Express) => {
// the sync callback.
app.get(`${baseUrl}/api/datamart/sync`, async (req, res) => {
const { time } = req.query;
const now = new Date();
const minutes = parseInt(time as string, 10) || 15;
const cutoff = new Date(now.getTime() - minutes * 60 * 1000);
const results = await db
.select()
.from(datamart)
.where(time ? gte(datamart.upd_date, cutoff) : sql`true`);
return apiReturn(res, {
success: true,
level: "info",
module: "datamart",
subModule: "query",
message: `All Queries older than ${parseInt(process.env.QUERY_CHECK?.trim() || "15", 10)}min `,
data: results,
status: 200,
});
});
//setup all the routes
app.use(`${baseUrl}/api/datamart`, runQuery);
@@ -18,6 +42,7 @@ export const setupDatamartRoutes = (baseUrl: string, app: Express) => {
app.get(`${baseUrl}/api/datamart`, async (_, res) => {
const queries = await db
.select({
id: datamart.id,
name: datamart.name,
description: datamart.description,
options: datamart.options,
@@ -25,9 +50,11 @@ export const setupDatamartRoutes = (baseUrl: string, app: Express) => {
upd_date: datamart.upd_date,
})
.from(datamart)
.where(eq(datamart.active, true));
.where(and(eq(datamart.active, true), eq(datamart.public, true)));
return apiReturn(res, {
return apiReturn(
res,
{
success: true,
level: "info",
module: "datamart",
@@ -35,6 +62,8 @@ export const setupDatamartRoutes = (baseUrl: string, app: Express) => {
message: "All active queries we can run",
data: queries,
status: 200,
});
},
{ sheetName: 3 },
);
});
};

View File

@@ -1,6 +1,6 @@
/**
* If we are running in client mode we want to periodically check the SERVER_NAME for new/updates queries
* this will be on a cronner job, we will check 2 times a day for new data, we will also have a route we can trigger to check this manually incase we have
* this will be on a croner job, we will check 2 times a day for new data, we will also have a route we can trigger to check this manually in case we have
* queries we make for one plant but will eventually go to all plants.
* in client mode we will not be able to add, update, or delete, or push updates
*
@@ -27,6 +27,14 @@
* we will also be able to do all the same as the server mode but the push here will just go to the main server.
*/
import axios from "axios";
import { count, sql } from "drizzle-orm";
import { db } from "../db/db.controller.js";
import { datamart } from "../db/schema/datamart.schema.js";
import { createLogger } from "../logger/logger.controller.js";
import { createCronJob } from "../utils/croner.utils.js";
import { tryCatch } from "../utils/trycatch.utils.js";
// doing the client stuff first
// ┌──────────────── (optional) second (0 - 59)
@@ -38,23 +46,84 @@
// │ │ │ │ │ │ (0 to 6 are Sunday to Saturday; 7 is Sunday, the same as 0)
// │ │ │ │ │ │
// * * * * * *
if (process.env.NODE_ENV?.trim() === "production") {
export const startDatamartSync = async () => {
// setup cronner
let cronTime = "* 5 * * * *";
let cronTime = "*/5 * * * *";
if (process.env.QUERY_TIME_TYPE === "m") {
// will run this cron ever x
cronTime = `* ${process.env.QUERY_CHECK} * * * *`;
cronTime = `*/${process.env.QUERY_CHECK} * * * *`;
}
if (process.env.QUERY_TIME_TYPE === "h") {
// will run this cron ever x
cronTime = `* * ${process.env.QUERY_CHECK} * * * `;
cronTime = `* */${process.env.QUERY_CHECK} * * *`;
}
if (process.env.QUERY_TIME_TYPE === "d") {
// will run this cron ever x
cronTime = `* * * * * ${process.env.QUERY_CHECK}`;
// if we are in client mode and in production we run the test to see whats new in the last x
if (
process.env.NODE_ENV?.trim() === "production" &&
process.env.APP_RUNNING_IN?.trim() === "client"
) {
createCronJob("dataMartSync", cronTime, async () => {
const log = createLogger({ module: "system", subModule: "croner" });
const syncTimeToCheck: number = parseInt(
process.env.QUERY_CHECK?.trim() || "5",
10,
);
let url = `http://${process.env.SERVER_NAME?.trim()}:3000/lst/api/datamart/sync?time=${syncTimeToCheck}`;
// validate how many querys we have
const qCount = await db.select({ count: count() }).from(datamart);
// if we dont have any queries change to a crazy amount of time
console.info(qCount[0]?.count);
if ((qCount[0]?.count || 0) < 0) {
url = `http://${process.env.SERVER_NAME?.trim()}:3000/lst/api/datamart/sync`;
}
const { data, error } = await tryCatch(axios.get(url));
if (error !== null) {
log.error(
{ error: error.message },
`There was an error getting the new queries.`,
);
return;
}
console.info(cronTime);
//what will we do with the new data passed over
log.info({ data: data.data }, `There are to be updated`);
const queries = data.data.data;
if (queries.length === 0) return;
const { data: updateQ, error: UpdateQError } = await tryCatch(
db
.insert(datamart)
.values(queries)
.onConflictDoUpdate({
target: datamart.id,
set: {
name: sql.raw(`excluded.${datamart.name}`),
description: sql.raw(`excluded.${datamart.description}`),
query: sql.raw(`excluded.${datamart.query}`),
version: sql.raw(`excluded.${datamart.version}`),
active: sql.raw(`excluded.${datamart.active}`),
options: sql.raw(`excluded.${datamart.options}`),
public: sql.raw(`excluded.${datamart.public}`),
},
}),
);
if (UpdateQError !== null) {
log.error(
{ error: UpdateQError },
"There was an error add/updating the queries",
);
}
if (updateQ) {
log.info({}, "New and updated queries have been added");
}
});
}
};

View File

@@ -21,6 +21,8 @@ const newQuery = z.object({
.optional(),
setActive: z.string().optional(),
active: z.boolean().optional(),
setPublicActive: z.string().optional(),
public: z.boolean().optional(),
});
r.patch("/:id", upload.single("queryFile"), async (req, res) => {
@@ -58,6 +60,10 @@ r.patch("/:id", upload.single("queryFile"), async (req, res) => {
query.active = v.setActive === "true";
}
if (v.setPublicActive) {
query.public = v.setPublicActive === "true";
}
// if we forget the file crash out
// if (!query.query) {
// // no query text anywhere
@@ -96,7 +102,8 @@ r.patch("/:id", upload.single("queryFile"), async (req, res) => {
upd_date: sql`NOW()`,
upd_user: "lst_user",
})
.where(eq(datamart.id, id as string)),
.where(eq(datamart.id, id as string))
.returning({ name: datamart.name }),
);
if (error) {
@@ -117,7 +124,7 @@ r.patch("/:id", upload.single("queryFile"), async (req, res) => {
level: "info", //connect.success ? "info" : "error",
module: "routes",
subModule: "datamart",
message: `${query.name} was just updated`,
message: `${data[0]?.name} was just updated`,
data: [],
status: 200, //connect.success ? 200 : 400,
});

View File

@@ -5,6 +5,7 @@ import { setupApiDocsRoutes } from "./configs/scaler.config.js";
import { setupDatamartRoutes } from "./datamart/datamart.routes.js";
import { setupProdSqlRoutes } from "./prodSql/prodSql.routes.js";
import stats from "./system/stats.route.js";
import { setupUtilsRoutes } from "./utils/utils.routes.js";
export const setupRoutes = (baseUrl: string, app: Express) => {
app.use(`${baseUrl}/api/stats`, stats);
@@ -13,6 +14,7 @@ export const setupRoutes = (baseUrl: string, app: Express) => {
setupProdSqlRoutes(baseUrl, app);
setupDatamartRoutes(baseUrl, app);
setupAuthRoutes(baseUrl, app);
setupUtilsRoutes(baseUrl, app);
// routes that get activated if the module is set to activated.

View File

@@ -31,6 +31,12 @@ export const datamartAddSpec: OpenAPIV3_1.PathsObject = {
description:
"Optional comma separated options string passed to the query",
},
publicAccess: {
type: "boolean",
example: "true",
description:
"Will this query be accessible by the frontend's",
},
queryFile: {
type: "string",
format: "binary",

View File

@@ -0,0 +1,75 @@
import { Cron } from "croner";
import { createLogger } from "../logger/logger.controller.js";
// example createJob
// createCronJob("test Cron", "*/5 * * * * *", async () => {
// console.log("help");
// });
export interface JobInfo {
name: string;
schedule: string;
nextRun: Date | null;
isRunning: boolean;
}
// Store running cronjobs
export const runningCrons: Record<string, Cron> = {};
export const createCronJob = async (
name: string,
schedule: string, // cron string with 8 8 IE: */5 * * * * * every 5th second
task?: () => Promise<void>, // what function are we passing over
) => {
// get the timezone based on the os timezone set
const timeZone = Intl.DateTimeFormat().resolvedOptions().timeZone;
// Destroy existing job if it exists
if (runningCrons[name]) {
runningCrons[name].stop();
}
// Create new job with Croner
runningCrons[name] = new Cron(
schedule,
{
timezone: timeZone,
catch: true, // Prevents unhandled rejections
name: name,
},
task,
);
const log = createLogger({ module: "system", subModule: "croner" });
log.info({}, `A job for ${name} was just created.`);
};
export const getAllJobs = (): JobInfo[] => {
return Object.entries(runningCrons).map(([name, job]) => ({
name,
schedule: job.getPattern() || "invalid",
nextRun: job.nextRun() || null,
lastRun: job.previousRun() || null,
isRunning: job.isRunning(), //job ? !job.isStopped() : false,
}));
};
export const removeCronJob = (name: string) => {
if (runningCrons[name]) {
runningCrons[name].stop();
delete runningCrons[name];
}
};
export const stopCronJob = (name: string) => {
if (runningCrons[name]) {
runningCrons[name].pause();
}
};
export const resumeCronJob = (name: string) => {
if (runningCrons[name]) {
runningCrons[name].resume();
}
};

View File

@@ -12,7 +12,8 @@ interface Data {
| "query"
| "sendmail"
| "auth"
| "datamart";
| "datamart"
| "jobs";
level: "info" | "error" | "debug" | "fatal";
message: string;
data?: unknown[];
@@ -65,8 +66,14 @@ export const returnFunc = (data: Data) => {
export function apiReturn(
res: Response,
opts: Data & { status?: number },
optional?: unknown, // leave this as unknown so we can pass an object or an array over.
): Response {
const result = returnFunc(opts);
const code = opts.status ?? (opts.success ? 200 : 500);
if (optional) {
return res
.status(code ?? (opts.success ? 200 : 500))
.json({ ...result, optional });
}
return res.status(code ?? (opts.success ? 200 : 500)).json(result);
}

View File

@@ -0,0 +1,16 @@
import type { Express } from "express";
import { getAllJobs } from "./croner.utils.js";
import { apiReturn } from "./returnHelper.utils.js";
export const setupUtilsRoutes = (baseUrl: string, app: Express) => {
app.get(`${baseUrl}/api/utils`, (_, res) => {
return apiReturn(res, {
success: true,
level: "info",
module: "utils",
subModule: "jobs",
message: "All current Jobs",
data: getAllJobs(),
status: 200,
});
});
};

8
package-lock.json generated
View File

@@ -12,7 +12,7 @@
"@dotenvx/dotenvx": "^1.51.2",
"@scalar/express-api-reference": "^0.8.28",
"@socket.io/admin-ui": "^0.5.1",
"axios": "^1.13.2",
"axios": "^1.13.3",
"better-auth": "^1.4.9",
"cors": "^2.8.5",
"croner": "^9.1.0",
@@ -6818,9 +6818,9 @@
}
},
"node_modules/axios": {
"version": "1.13.2",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.13.2.tgz",
"integrity": "sha512-VPk9ebNqPcy5lRGuSlKx752IlDatOjT9paPlm8A7yOuW2Fbvp4X3JznJtT4f0GzGLLiWE9W8onz51SqLYwzGaA==",
"version": "1.13.3",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.13.3.tgz",
"integrity": "sha512-ERT8kdX7DZjtUm7IitEyV7InTHAF42iJuMArIiDIV5YtPanJkgw4hw5Dyg9fh0mihdWNn1GKaeIWErfe56UQ1g==",
"license": "MIT",
"dependencies": {
"follow-redirects": "^1.15.6",

View File

@@ -65,7 +65,7 @@
"@dotenvx/dotenvx": "^1.51.2",
"@scalar/express-api-reference": "^0.8.28",
"@socket.io/admin-ui": "^0.5.1",
"axios": "^1.13.2",
"axios": "^1.13.3",
"better-auth": "^1.4.9",
"cors": "^2.8.5",
"croner": "^9.1.0",