import { eq } from "drizzle-orm"; import { db } from "../db/db.controller.js"; import { prodAuditLogState } from "../db/schema/prodAuditlog.lastProcessed.schema.js"; import { type NewProdAuditLog, prodAuditLog, } from "../db/schema/prodAuditlog.schema.js"; import { createLogger } from "../logger/logger.controller.js"; import { prodQuery } from "../prodSql/prodSqlQuery.controller.js"; import { type SqlQuery, sqlQuerySelector, } from "../prodSql/prodSqlQuerySelector.utils.js"; import { delay } from "../utils/delay.utils.js"; import { returnFunc } from "../utils/returnHelper.utils.js"; import { tryCatch } from "../utils/trycatch.utils.js"; const log = createLogger({ module: "system", subModule: "prodAuditLog" }); let bufferProcessInProgress = false; export const monitorProdAuditLog = async () => { const auditLogQuery = sqlQuerySelector(`prod.auditlog`) as SqlQuery; /* get the last processed audit log so we can only pull the newest ones. as the initial go will be zero we want to look at the top 1 so we only pull the most recent one. */ const latestAuditId = await db.select().from(prodAuditLogState).limit(1); let auditQuery = auditLogQuery.query; if (latestAuditId.length === 0) { auditQuery = auditQuery .replace( "DECLARE @lastId BIGINT = [lstId];", "--DECLARE @lastId BIGINT = [lstId];", ) .replace("TOP (500)", "TOP (1)") .replace("ASC", "DESC") .replace("AND Id > @lastId", "--AND Id > @lastId"); } else { auditQuery = auditQuery.replace( "[lstId]", `${latestAuditId[0]?.lastImportedAuditId}`, ); } const { data: queryRun, error } = await tryCatch( prodQuery(auditQuery, `Running auditLog query`), ); if (error) { return returnFunc({ success: false, level: "error", module: "system", subModule: "auditLog", message: `Data for: AuditLog Failed`, data: [error], notify: false, }); } if (!queryRun.success) { return returnFunc({ success: false, level: "error", module: "datamart", subModule: "query", message: queryRun.message, data: queryRun.data, notify: false, }); } const safeJsonParse = (value: unknown) => { if (typeof value !== "string") return value; try { return JSON.parse(value); } catch { return { raw: value }; } }; // remap everything lst logs to keep it easy to read if (queryRun.data.length > 0) { const auditRows = queryRun.data.map((r) => ({ auditId: r.Id, actorName: r.ActorName, auditCreatedDate: r.CreatedDateTime, message: r.Message, content: safeJsonParse(r.Content), status: "pending", processed: false, retryCount: 0, })) as NewProdAuditLog[]; await db.insert(prodAuditLog).values(auditRows).onConflictDoNothing(); const newestAuditId = queryRun.data.at(-1).Id; await db .insert(prodAuditLogState) .values({ id: 1, lastImportedAuditId: newestAuditId, updatedAt: new Date(), }) .onConflictDoUpdate({ target: prodAuditLogState.id, set: { lastImportedAuditId: newestAuditId, updatedAt: new Date(), }, }); } }; export const bufferProcess = async (_data: unknown) => { if (bufferProcessInProgress) { log.debug({}, "[bufferProcess] already running, skipping trigger"); return; } bufferProcessInProgress = true; try { log.debug({}, "[bufferProcess] started"); while (true) { const row = await db.query.prodAuditLog.findFirst({ where: (audit, { eq }) => eq(audit.processed, false), orderBy: (audit, { asc }) => [asc(audit.auditId)], }); if (!row) { log.debug({}, "[bufferProcess] no pending rows"); break; } log.debug({}, "[bufferProcess] processing audit row", row.auditId); // tiny delay so you can visually validate the flow await delay(250); // TODO add in case statement to do things, if thing returns good then say good if fails then we set to error and let it retry later. // for items that don't fall in the case will auto set status to success // console.log(row.message.split(".")); await db .update(prodAuditLog) .set({ processed: true, status: "success", processedAt: new Date(), updatedAt: new Date(), }) .where(eq(prodAuditLog.id, row.id)); } } catch (error) { log.error({ stack: error }, "[bufferProcess] failed"); } finally { bufferProcessInProgress = false; log.debug({}, "[bufferProcess] finished"); } };