feat(ws server): added in a websocket on port system to help with better logging

This commit is contained in:
2025-07-25 12:13:19 -05:00
parent 074032f20d
commit 5bcbdaf3d0
17 changed files with 679 additions and 19 deletions

View File

@@ -1,4 +1,4 @@
package logging
package loggingx
import (
"encoding/json"

View File

@@ -1,4 +1,4 @@
package logging
package loggingx
import (
"github.com/gin-gonic/gin"

View File

@@ -1,4 +1,4 @@
package logging
package loggingx
import (
"fmt"
@@ -20,7 +20,7 @@ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
fmt.Println("Origin:", r.Header.Get("Origin"))
//fmt.Println("Origin:", r.Header.Get("Origin"))
return true
},
}
@@ -65,6 +65,17 @@ func handleSSE(c *gin.Context) {
"user_agent": c.Request.UserAgent(),
})
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Credentials", "true")
c.Header("Access-Control-Allow-Headers", "Content-Type")
c.Header("Access-Control-Allow-Methods", "GET, OPTIONS")
// Handle preflight requests
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)
return
}
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
@@ -92,6 +103,11 @@ func handleSSE(c *gin.Context) {
func handleWebSocket(c *gin.Context) {
log := New()
log.Info("WebSocket connection established", "logger", map[string]interface{}{
"endpoint": "/api/logger/logs",
"client_ip": c.ClientIP(),
"user_agent": c.Request.UserAgent(),
})
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Error("WebSocket upgrade failed", "logger", map[string]interface{}{

View File

@@ -0,0 +1,53 @@
package config
import (
"github.com/gin-gonic/gin"
"lst.net/utils/db"
logging "lst.net/utils/logger"
)
type ConfigUpdateInput struct {
Description *string `json:"description"`
Value *string `json:"value"`
Enabled *bool `json:"enabled"`
AppService *string `json:"app_service"`
}
func RegisterConfigRoutes(l *gin.Engine, baseUrl string) {
// seed the db on start up
db.SeedConfigs(db.DB)
configGroup := l.Group(baseUrl + "/api/config")
configGroup.GET("/configs", getconfigs)
configGroup.POST("/configs", newConfig)
}
func getconfigs(c *gin.Context) {
log := logging.New()
configs, err := db.GetAllConfigs(db.DB)
log.Info("Current Configs", "system", map[string]interface{}{
"endpoint": "/api/config/configs",
"client_ip": c.ClientIP(),
"user_agent": c.Request.UserAgent(),
})
if err != nil {
c.JSON(500, gin.H{"message": "There was an error getting the configs", "error": err})
}
c.JSON(200, gin.H{"message": "Current configs", "data": configs})
}
func newConfig(c *gin.Context) {
var config ConfigUpdateInput
err := c.ShouldBindBodyWithJSON(&config)
if err != nil {
c.JSON(500, gin.H{"message": "Internal Server Error"})
}
c.JSON(200, gin.H{"message": "New config was just added", "data": config})
}

View File

@@ -0,0 +1 @@
package system

View File

@@ -0,0 +1 @@
package system

View File

@@ -0,0 +1,83 @@
package channelmgt
import (
"database/sql"
"encoding/json"
"fmt"
"os"
"time"
"github.com/lib/pq"
logging "lst.net/utils/logger"
)
// setup the notifiyer
// -- Only needs to be run once in DB
// CREATE OR REPLACE FUNCTION notify_new_log() RETURNS trigger AS $$
// BEGIN
// PERFORM pg_notify('new_log', row_to_json(NEW)::text);
// RETURN NEW;
// END;
// $$ LANGUAGE plpgsql;
// DROP TRIGGER IF EXISTS new_log_trigger ON logs;
// CREATE TRIGGER new_log_trigger
// AFTER INSERT ON logs
// FOR EACH ROW EXECUTE FUNCTION notify_new_log();
func AllLogs(db *sql.DB, broadcaster chan logging.Message) {
fmt.Println("[AllLogs] started")
log := logging.New()
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
os.Getenv("DB_HOST"),
os.Getenv("DB_PORT"),
os.Getenv("DB_USER"),
os.Getenv("DB_PASSWORD"),
os.Getenv("DB_NAME"),
)
listener := pq.NewListener(dsn, 10*time.Second, time.Minute, nil)
err := listener.Listen("new_log")
if err != nil {
log.Panic("Failed to LISTEN on new_log", "logger", map[string]interface{}{
"error": err.Error(),
})
}
fmt.Println("Listening for new logs...")
for {
select {
case notify := <-listener.Notify:
if notify != nil {
fmt.Println("New log notification received")
// Unmarshal the JSON payload of the inserted row
var logData map[string]interface{}
if err := json.Unmarshal([]byte(notify.Extra), &logData); err != nil {
log.Error("Failed to unmarshal notification payload", "logger", map[string]interface{}{
"error": err.Error(),
})
continue
}
// Build message to broadcast
msg := logging.Message{
Channel: "logs", // This matches your logs channel name
Data: logData,
}
broadcaster <- msg
//fmt.Printf("[Broadcasting] sending: %+v\n", msg)
}
case <-time.After(90 * time.Second):
go func() {
log.Debug("Re-pinging Postgres LISTEN", "logger", map[string]interface{}{})
listener.Ping()
}()
}
}
}

View File

@@ -0,0 +1,93 @@
package socketio
import (
"log"
"sync"
"github.com/google/uuid"
"github.com/gorilla/websocket"
logging "lst.net/utils/logger"
)
type Client struct {
ClientID uuid.UUID
Conn *websocket.Conn
APIKey string
IPAddress string
UserAgent string
Send chan []byte
Channels map[string]bool // e.g., {"logs": true, "labels": true}
}
var clients = make(map[*Client]bool)
var clientsLock sync.Mutex
func init() {
var broadcast = make(chan string)
go func() {
for {
msg := <-broadcast
clientsLock.Lock()
for client := range clients {
if client.Channels["logs"] {
err := client.Conn.WriteMessage(websocket.TextMessage, []byte(msg))
if err != nil {
log.Println("Write error:", err)
client.Conn.Close()
//client.MarkDisconnected()
delete(clients, client)
}
}
}
clientsLock.Unlock()
}
}()
}
func StartBroadcasting(broadcaster chan logging.Message) {
go func() {
log.Println("StartBroadcasting goroutine started")
for {
msg := <-broadcaster
//log.Printf("Received msg on broadcaster: %+v\n", msg)
clientsLock.Lock()
for client := range clients {
if client.Channels[msg.Channel] {
log.Println("Sending message to client")
err := client.Conn.WriteJSON(msg)
if err != nil {
log.Println("Write error:", err)
client.Conn.Close()
client.MarkDisconnected()
delete(clients, client)
}
} else {
log.Println("Skipping client, channel mismatch")
}
}
clientsLock.Unlock()
}
}()
}
// func (c *Client) JoinChannel(name string) {
// ch := GetOrCreateChannel(name)
// c.Channels[name] = ch
// ch.Register <- c
// }
// func (c *Client) LeaveChannel(name string) {
// if ch, ok := c.Channels[name]; ok {
// ch.Unregister <- c
// delete(c.Channels, name)
// }
// }
func (c *Client) Disconnect() {
// for _, ch := range c.Channels {
// ch.Unregister <- c
// }
close(c.Send)
}

View File

@@ -0,0 +1,163 @@
package socketio
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"lst.net/utils/db"
)
type JoinPayload struct {
Channel string `json:"channel"`
Services []string `json:"services,omitempty"`
APIKey string `json:"apiKey"`
}
// type Channel struct {
// Name string
// Clients map[*Client]bool
// Register chan *Client
// Unregister chan *Client
// Broadcast chan Message
// }
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true }, // allow all origins; customize for prod
}
func SocketHandler(c *gin.Context) {
// Upgrade HTTP to websocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println("Failed to upgrade:", err)
c.AbortWithStatus(http.StatusBadRequest)
return
}
defer conn.Close()
// Create client struct
client := &Client{
Conn: conn,
IPAddress: c.ClientIP(),
UserAgent: c.Request.UserAgent(),
Channels: make(map[string]bool),
}
clientsLock.Lock()
clients[client] = true
clientsLock.Unlock()
defer func() {
clientsLock.Lock()
delete(clients, client)
clientsLock.Unlock()
client.MarkDisconnected()
client.Disconnect()
conn.Close()
}()
for {
// Read message from client
_, msg, err := conn.ReadMessage()
if err != nil {
log.Println("Read error:", err)
clientsLock.Lock()
delete(clients, client)
clientsLock.Unlock()
client.MarkDisconnected()
client.Disconnect()
break
}
var payload JoinPayload
err = json.Unmarshal(msg, &payload)
if err != nil {
log.Println("Invalid JSON payload:", err)
clientsLock.Lock()
delete(clients, client)
clientsLock.Unlock()
client.MarkDisconnected()
client.Disconnect()
continue
}
// Simple API key check (replace with real auth)
if payload.APIKey == "" {
conn.WriteMessage(websocket.TextMessage, []byte("Missing API Key"))
continue
}
client.APIKey = payload.APIKey
// Handle channel subscription, add more here as we get more in.
switch payload.Channel {
case "logs":
client.Channels["logs"] = true
case "logServices":
for _, svc := range payload.Services {
client.Channels["logServices:"+svc] = true
}
case "labels":
client.Channels["labels"] = true
default:
conn.WriteMessage(websocket.TextMessage, []byte("Unknown channel"))
continue
}
// Save client info in DB
client.SaveToDB()
// Confirm subscription
resp := map[string]string{
"status": "subscribed",
"channel": payload.Channel,
}
respJSON, _ := json.Marshal(resp)
conn.WriteMessage(websocket.TextMessage, respJSON)
// You could now start pushing messages to client or keep connection open
// For demo, just wait and keep connection alive
}
}
func (c *Client) SaveToDB() {
// Convert c.Channels (map[string]bool) to map[string]interface{} for JSONB
channels := make(map[string]interface{})
for ch := range c.Channels {
channels[ch] = true
}
clientRecord := &db.ClientRecord{
APIKey: c.APIKey,
IPAddress: c.IPAddress,
UserAgent: c.UserAgent,
Channels: db.JSONB(channels),
ConnectedAt: time.Now(),
LastHeartbeat: time.Now(),
}
if err := db.DB.Create(&clientRecord).Error; err != nil {
log.Println("❌ Error saving client:", err)
} else {
c.ClientID = clientRecord.ClientID // ✅ Assign the generated UUID back to the client
}
}
func (c *Client) MarkDisconnected() {
now := time.Now()
res := db.DB.Model(&db.ClientRecord{}).
Where("client_id = ?", c.ClientID).
Updates(map[string]interface{}{
"disconnected_at": &now,
})
if res.RowsAffected == 0 {
log.Println("⚠️ No rows updated for client_id:", c.ClientID)
}
if res.Error != nil {
log.Println("❌ Error updating disconnected_at:", res.Error)
}
}

View File

@@ -0,0 +1,25 @@
package socketio
import (
"github.com/gin-gonic/gin"
channelmgt "lst.net/cmd/services/websocket/channelMGT"
"lst.net/utils/db"
logging "lst.net/utils/logger"
)
var broadcaster = make(chan logging.Message) // define broadcaster here so its accessible
func RegisterSocketRoutes(r *gin.Engine) {
sqlDB, err := db.DB.DB()
if err != nil {
panic(err)
}
// channels
go channelmgt.AllLogs(sqlDB, broadcaster)
go StartBroadcasting(broadcaster)
wsGroup := r.Group("/ws")
wsGroup.GET("/connect", SocketHandler)
}