Files
logistics_support_tool/backend/cmd/services/websocket/log_services.go

80 lines
1.8 KiB
Go

package websocket
// setup the notifiyer
// -- Only needs to be run once in DB
// CREATE OR REPLACE FUNCTION notify_new_log() RETURNS trigger AS $$
// BEGIN
// PERFORM pg_notify('new_log', row_to_json(NEW)::text);
// RETURN NEW;
// END;
// $$ LANGUAGE plpgsql;
// DROP TRIGGER IF EXISTS new_log_trigger ON logs;
// CREATE TRIGGER new_log_trigger
// AFTER INSERT ON logs
// FOR EACH ROW EXECUTE FUNCTION notify_new_log();
import (
"encoding/json"
"fmt"
"os"
"time"
"github.com/lib/pq"
logging "lst.net/utils/logger"
)
func LogServices(broadcaster chan logging.Message) {
fmt.Println("[LogServices] started - single channel for all logs")
logger := logging.New()
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
os.Getenv("DB_HOST"),
os.Getenv("DB_PORT"),
os.Getenv("DB_USER"),
os.Getenv("DB_PASSWORD"),
os.Getenv("DB_NAME"),
)
listener := pq.NewListener(dsn, 10*time.Second, time.Minute, nil)
err := listener.Listen("new_log")
if err != nil {
logger.Panic("Failed to LISTEN on new_log", "logger", map[string]interface{}{
"error": err.Error(),
})
}
fmt.Println("Listening for all logs through single logServices channel...")
for {
select {
case notify := <-listener.Notify:
if notify != nil {
var logData map[string]interface{}
if err := json.Unmarshal([]byte(notify.Extra), &logData); err != nil {
logger.Error("Failed to unmarshal notification payload", "logger", map[string]interface{}{
"error": err.Error(),
})
continue
}
// Always send to logServices channel
broadcaster <- logging.Message{
Channel: "logServices",
Data: logData,
Meta: map[string]interface{}{
"level": logData["level"],
"service": logData["service"],
},
}
}
case <-time.After(90 * time.Second):
go func() {
listener.Ping()
}()
}
}
}