refactor(app): moved all db and log to one intialize spot

This commit is contained in:
2025-08-04 06:54:21 -05:00
parent 486e4fb6b8
commit 0ecbe29ec1
13 changed files with 148 additions and 99 deletions

View File

@@ -2,14 +2,13 @@ package ws
import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"lst.net/internal/db"
"gorm.io/gorm"
"lst.net/internal/models"
"lst.net/pkg"
"lst.net/pkg/logger"
@@ -38,7 +37,8 @@ type Client struct {
}
func (c *Client) SaveToDB() {
func (c *Client) SaveToDB(log *logger.CustomLogger, db *gorm.DB) {
// Convert c.Channels (map[string]bool) to map[string]interface{} for JSONB
channels := make(map[string]interface{})
for ch := range c.Channels {
@@ -54,22 +54,27 @@ func (c *Client) SaveToDB() {
LastHeartbeat: time.Now(),
}
if err := db.DB.Create(&clientRecord).Error; err != nil {
log.Println("❌ Error saving client:", err)
if err := db.Create(&clientRecord).Error; err != nil {
log.Error("❌ Error saving client", "websocket", map[string]interface{}{
"error": err,
})
} else {
c.ClientID = clientRecord.ClientID
c.ConnectedAt = clientRecord.ConnectedAt
clientData := fmt.Sprintf("A new client %v, just connected", c.ClientID)
log.Info(clientData, "websocket", map[string]interface{}{})
}
}
func (c *Client) MarkDisconnected() {
logger := logger.New()
clientData := fmt.Sprintf("Client %v just lefts us", c.ClientID)
logger.Info(clientData, "websocket", map[string]interface{}{})
func (c *Client) MarkDisconnected(log *logger.CustomLogger, db *gorm.DB) {
clientData := fmt.Sprintf("Client %v Dicconected", c.ClientID)
log.Info(clientData, "websocket", map[string]interface{}{})
now := time.Now()
res := db.DB.Model(&models.ClientRecord{}).
res := db.Model(&models.ClientRecord{}).
Where("client_id = ?", c.ClientID).
Updates(map[string]interface{}{
"disconnected_at": &now,
@@ -77,13 +82,13 @@ func (c *Client) MarkDisconnected() {
if res.RowsAffected == 0 {
logger.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
log.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
"clientID": c.ClientID,
})
}
if res.Error != nil {
logger.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
log.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
"clientID": c.ClientID,
"error": res.Error,
})
@@ -137,28 +142,31 @@ const (
writeWait = 10 * time.Second
)
func (c *Client) StartHeartbeat() {
logger := logger.New()
log.Println("Started hearbeat")
func (c *Client) StartHeartbeat(log *logger.CustomLogger, db *gorm.DB) {
log.Debug("Started hearbeat", "websocket", map[string]interface{}{})
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !c.isAlive.Load() { // Correct way to read atomic.Bool
if !c.isAlive.Load() {
return
}
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Printf("Heartbeat failed for %s: %v", c.ClientID, err)
c.Close()
log.Error("Heartbeat failed", "websocket", map[string]interface{}{
"client_id": c.ClientID,
"error": err,
})
c.Close(log, db)
return
}
now := time.Now()
res := db.DB.Model(&models.ClientRecord{}).
res := db.Model(&models.ClientRecord{}).
Where("client_id = ?", c.ClientID).
Updates(map[string]interface{}{
"last_heartbeat": &now,
@@ -166,17 +174,21 @@ func (c *Client) StartHeartbeat() {
if res.RowsAffected == 0 {
logger.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
log.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
"clientID": c.ClientID,
})
}
if res.Error != nil {
logger.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
log.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
"clientID": c.ClientID,
"error": res.Error,
})
}
clientStuff := fmt.Sprintf("HeartBeat just done on: %v", c.ClientID)
log.Info(clientStuff, "websocket", map[string]interface{}{
"clientID": c.ClientID,
})
case <-c.done:
return
@@ -184,16 +196,16 @@ func (c *Client) StartHeartbeat() {
}
}
func (c *Client) Close() {
func (c *Client) Close(log *logger.CustomLogger, db *gorm.DB) {
if c.isAlive.CompareAndSwap(true, false) { // Atomic swap
close(c.done)
c.Conn.Close()
// Add any other cleanup here
c.MarkDisconnected()
c.MarkDisconnected(log, db)
}
}
func (c *Client) startServerPings() {
func (c *Client) startServerPings(log *logger.CustomLogger, db *gorm.DB) {
ticker := time.NewTicker(60 * time.Second) // Ping every 30s
defer ticker.Stop()
@@ -202,7 +214,13 @@ func (c *Client) startServerPings() {
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.Close() // Disconnect if ping fails
log.Error("Server Ping failed", "websocket", map[string]interface{}{
"clientID": c.ClientID,
"error": err,
})
c.Close(log, db)
return
}
case <-c.done:
@@ -219,15 +237,14 @@ func (c *Client) IsActive() bool {
return time.Since(c.lastActive) < 45*time.Second // 1.5x ping interval
}
func (c *Client) updateHeartbeat() {
func (c *Client) updateHeartbeat(log *logger.CustomLogger, db *gorm.DB) {
//fmt.Println("Updating heatbeat")
now := time.Now()
logger := logger.New()
//fmt.Printf("Updating heartbeat for client: %s at %v\n", c.ClientID, now)
//db.DB = db.DB.Debug()
res := db.DB.Model(&models.ClientRecord{}).
res := db.Model(&models.ClientRecord{}).
Where("client_id = ?", c.ClientID).
Updates(map[string]interface{}{
"last_heartbeat": &now, // Explicit format
@@ -235,27 +252,27 @@ func (c *Client) updateHeartbeat() {
//fmt.Printf("Executed SQL: %v\n", db.DB.Statement.SQL.String())
if res.RowsAffected == 0 {
logger.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
log.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
"clientID": c.ClientID,
})
}
if res.Error != nil {
logger.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
log.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
"clientID": c.ClientID,
"error": res.Error,
})
}
// 2. Verify DB connection
if db.DB == nil {
logger.Error("DB connection is nil", "websocket", map[string]interface{}{})
if db == nil {
log.Error("DB connection is nil", "websocket", map[string]interface{}{})
return
}
// 3. Test raw SQL execution first
testRes := db.DB.Exec("SELECT 1")
testRes := db.Exec("SELECT 1")
if testRes.Error != nil {
logger.Error("DB ping failed", "websocket", map[string]interface{}{
log.Error("DB ping failed", "websocket", map[string]interface{}{
"error": testRes.Error,
})
return