Compare commits
10 Commits
6a631be909
...
v0.0.1-alp
| Author | SHA1 | Date | |
|---|---|---|---|
| b6968b7b67 | |||
| a0aa75c5a0 | |||
| 78be07c8bb | |||
| 0575a34422 | |||
| 3bc3801ffb | |||
| 4368111311 | |||
| daf9e8a966 | |||
| 8a08d3eac6 | |||
| a761a3634b | |||
| a1a30cffd1 |
44
CHANGELOG.md
44
CHANGELOG.md
@@ -3,6 +3,50 @@
|
|||||||
All notable changes to LST will be documented in this file.
|
All notable changes to LST will be documented in this file.
|
||||||
|
|
||||||
|
|
||||||
|
## [0.0.1-alpha.6](https://git.tuffraid.net/cowch/logistics_support_tool/compare/v0.0.1-alpha.5...v0.0.1-alpha.6) (2025-07-31)
|
||||||
|
|
||||||
|
### 🌟 Enhancements
|
||||||
|
|
||||||
|
* **logging:** added in db and logging with websocket ([52ef39f](https://git.tuffraid.net/cowch/logistics_support_tool/commit/52ef39fd5c129ed02ed9f38dbf7e49ae06807ad6))
|
||||||
|
* **settings:** migrated all settings endpoints confirmed as well for updates ([0575a34](https://git.tuffraid.net/cowch/logistics_support_tool/commit/0575a344229ba0ff5c0f47781c6d596e5c08e5eb))
|
||||||
|
* **ws server:** added in a websocket on port system to help with better logging ([5bcbdaf](https://git.tuffraid.net/cowch/logistics_support_tool/commit/5bcbdaf3d0e889729d4dce3df51f4330d7793868))
|
||||||
|
|
||||||
|
### 🐛 Bug fixes
|
||||||
|
|
||||||
|
* **update server:** fixed to make sure everything is stopped before doing the remaining update ([13e282e](https://git.tuffraid.net/cowch/logistics_support_tool/commit/13e282e815c1c95a0a5298ede2f6497cdf036440))
|
||||||
|
* **websocket:** errors in saving client info during ping ping ([4368111](https://git.tuffraid.net/cowch/logistics_support_tool/commit/4368111311c48e73a11a6b24febdcc3be31a2a59))
|
||||||
|
* **wrapper:** corrections to properly handle websockets :D ([a761a36](https://git.tuffraid.net/cowch/logistics_support_tool/commit/a761a3634b6cb0aeeb571dd634bd158cee530779))
|
||||||
|
|
||||||
|
### 📚 Documentation
|
||||||
|
|
||||||
|
* **.env example:** added postrgres example ([14dd87e](https://git.tuffraid.net/cowch/logistics_support_tool/commit/14dd87e335a63d76d64c07a15cf593cb286a9833))
|
||||||
|
* **dockerbuild:** comments as a reminder for my seld ([52956ec](https://git.tuffraid.net/cowch/logistics_support_tool/commit/52956ecaa45cd556ba7832d6cb9ec2cf883d983a))
|
||||||
|
* **docker:** docs about the custom network for the db is seperated ([6a631be](https://git.tuffraid.net/cowch/logistics_support_tool/commit/6a631be909b56a899af393510edffd70d7901a7a))
|
||||||
|
* **wss:** more ws stuff ([63c053b](https://git.tuffraid.net/cowch/logistics_support_tool/commit/63c053b38ce3ab3c3a94cda620da930f4e8615bd))
|
||||||
|
|
||||||
|
### 🛠️ Code Refactor
|
||||||
|
|
||||||
|
* **app port:** changed to have the port be dyncamic on the iis side ([074032f](https://git.tuffraid.net/cowch/logistics_support_tool/commit/074032f20dc90810416c5899e44fefe86b52f98a))
|
||||||
|
* **build:** added back in the build name stuff ([92ce51e](https://git.tuffraid.net/cowch/logistics_support_tool/commit/92ce51eb7cf14ebb599c29fea4721e21badafbf6))
|
||||||
|
* **config:** changed to settings to match the other lst in node. makes it more easy to manage ([3bc3801](https://git.tuffraid.net/cowch/logistics_support_tool/commit/3bc3801ffbb544a814d52c72e566e8d4866a7f38))
|
||||||
|
* **createzip:** added in env-example to the zip file ([6c8ac33](https://git.tuffraid.net/cowch/logistics_support_tool/commit/6c8ac33be73f203137b883e33feb625ccc0945e9))
|
||||||
|
* **docker compose example:** added in postgress stuff plus network ([623e19f](https://git.tuffraid.net/cowch/logistics_support_tool/commit/623e19f028d27fbfc46bee567ce78169cddba8fb))
|
||||||
|
* **settings:** changed config to settings and added in the update method for this as well ([a0aa75c](https://git.tuffraid.net/cowch/logistics_support_tool/commit/a0aa75c5a0b4a6e3a10b88bbcccf43d096e532b4))
|
||||||
|
* **wrapper:** removed the logger stuff so we dont fill up space ([8a08d3e](https://git.tuffraid.net/cowch/logistics_support_tool/commit/8a08d3eac6540b00ff23115936d56b4f22f16d53))
|
||||||
|
* **ws:** ws logging and channel manager added no auth currently ([a1a30cf](https://git.tuffraid.net/cowch/logistics_support_tool/commit/a1a30cffd18e02e1061959fa3164f8237522880c))
|
||||||
|
|
||||||
|
### 🚀 Performance
|
||||||
|
|
||||||
|
* **websocket:** added in base url to help with ssl stuff and iis ([daf9e8a](https://git.tuffraid.net/cowch/logistics_support_tool/commit/daf9e8a966fd440723b1aec932a02873a5e27eb7))
|
||||||
|
|
||||||
|
### 📝 Testing Code
|
||||||
|
|
||||||
|
* **iis:** wrapper test for ws ([75c17d2](https://git.tuffraid.net/cowch/logistics_support_tool/commit/75c17d20659dcc5a762e00928709c4d3dd277284))
|
||||||
|
|
||||||
|
### 📈 Project changes
|
||||||
|
|
||||||
|
* **hotreload:** added in air for hot reloading ([78be07c](https://git.tuffraid.net/cowch/logistics_support_tool/commit/78be07c8bbf5acbcdac65351f693941f47be4cb5))
|
||||||
|
|
||||||
## [0.0.1-alpha.5](https://git.tuffraid.net/cowch/logistics_support_tool/compare/v0.0.1-alpha.4...v0.0.1-alpha.5) (2025-07-21)
|
## [0.0.1-alpha.5](https://git.tuffraid.net/cowch/logistics_support_tool/compare/v0.0.1-alpha.4...v0.0.1-alpha.5) (2025-07-21)
|
||||||
|
|
||||||
### 🌟 Enhancements
|
### 🌟 Enhancements
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
|
using System;
|
||||||
|
using System.IO;
|
||||||
using System.Net;
|
using System.Net;
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
using System.Text;
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.AspNetCore.Builder;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
|
|
||||||
builder.Services.AddHttpClient("GoBackend", client =>
|
builder.Services.AddHttpClient("GoBackend", client =>
|
||||||
{
|
{
|
||||||
client.BaseAddress = new Uri("http://localhost:8080");
|
client.BaseAddress = new Uri("http://localhost:8080");
|
||||||
@@ -14,13 +18,30 @@ var app = builder.Build();
|
|||||||
// Enable WebSocket support
|
// Enable WebSocket support
|
||||||
app.UseWebSockets();
|
app.UseWebSockets();
|
||||||
|
|
||||||
|
// Logging method
|
||||||
|
void LogToFile(string message)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
string logDir = Path.Combine(AppContext.BaseDirectory, "logs");
|
||||||
|
Directory.CreateDirectory(logDir);
|
||||||
|
string logFilePath = Path.Combine(logDir, "proxy_log.txt");
|
||||||
|
File.AppendAllText(logFilePath, $"{DateTime.UtcNow}: {message}{Environment.NewLine}");
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Handle potential errors writing to log file
|
||||||
|
Console.WriteLine($"Logging error: {ex.Message}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Middleware to handle WebSocket requests
|
||||||
app.Use(async (context, next) =>
|
app.Use(async (context, next) =>
|
||||||
{
|
{
|
||||||
// Proxy WebSocket requests for /lst/api/logger/logs (adjust path as needed)
|
if (context.WebSockets.IsWebSocketRequest && context.Request.Path.StartsWithSegments("/ws"))
|
||||||
if (context.WebSockets.IsWebSocketRequest &&
|
|
||||||
context.Request.Path.StartsWithSegments("/lst/api/logger/logs"))
|
|
||||||
{
|
{
|
||||||
Console.WriteLine("WebSocket request received!");
|
// LogToFile($"WebSocket request received for path: {context.Request.Path}");
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var backendUri = new UriBuilder("ws", "localhost", 8080)
|
var backendUri = new UriBuilder("ws", "localhost", 8080)
|
||||||
@@ -30,46 +51,32 @@ app.Use(async (context, next) =>
|
|||||||
}.Uri;
|
}.Uri;
|
||||||
|
|
||||||
using var backendSocket = new ClientWebSocket();
|
using var backendSocket = new ClientWebSocket();
|
||||||
|
|
||||||
// Forward most headers except those managed by WebSocket protocol
|
|
||||||
foreach (var header in context.Request.Headers)
|
|
||||||
{
|
|
||||||
if (!header.Key.Equals("Host", StringComparison.OrdinalIgnoreCase) &&
|
|
||||||
!header.Key.Equals("Upgrade", StringComparison.OrdinalIgnoreCase) &&
|
|
||||||
!header.Key.Equals("Connection", StringComparison.OrdinalIgnoreCase) &&
|
|
||||||
!header.Key.Equals("Sec-WebSocket-Key", StringComparison.OrdinalIgnoreCase) &&
|
|
||||||
!header.Key.Equals("Sec-WebSocket-Version", StringComparison.OrdinalIgnoreCase))
|
|
||||||
{
|
|
||||||
backendSocket.Options.SetRequestHeader(header.Key, header.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await backendSocket.ConnectAsync(backendUri, context.RequestAborted);
|
await backendSocket.ConnectAsync(backendUri, context.RequestAborted);
|
||||||
|
|
||||||
using var frontendSocket = await context.WebSockets.AcceptWebSocketAsync();
|
using var frontendSocket = await context.WebSockets.AcceptWebSocketAsync();
|
||||||
|
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
// Bidirectional forwarding tasks
|
// WebSocket forwarding tasks
|
||||||
var forwardToBackend = ForwardWebSocketAsync(frontendSocket, backendSocket, cts.Token);
|
var forwardToBackend = ForwardWebSocketAsync(frontendSocket, backendSocket, cts.Token);
|
||||||
var forwardToFrontend = ForwardWebSocketAsync(backendSocket, frontendSocket, cts.Token);
|
var forwardToFrontend = ForwardWebSocketAsync(backendSocket, frontendSocket, cts.Token);
|
||||||
|
|
||||||
await Task.WhenAny(forwardToBackend, forwardToFrontend);
|
await Task.WhenAny(forwardToBackend, forwardToFrontend);
|
||||||
cts.Cancel();
|
cts.Cancel();
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
//LogToFile($"WebSocket proxy error: {ex.Message}");
|
||||||
context.Response.StatusCode = (int)HttpStatusCode.BadGateway;
|
context.Response.StatusCode = (int)HttpStatusCode.BadGateway;
|
||||||
await context.Response.WriteAsync($"WebSocket proxy error: {ex.Message}");
|
await context.Response.WriteAsync($"WebSocket proxy error: {ex.Message}");
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
await next();
|
await next();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Proxy normal HTTP requests
|
// Middleware to handle HTTP requests
|
||||||
app.Use(async (context, next) =>
|
app.Use(async (context, next) =>
|
||||||
{
|
{
|
||||||
if (context.WebSockets.IsWebSocketRequest)
|
if (context.WebSockets.IsWebSocketRequest)
|
||||||
@@ -100,24 +107,24 @@ app.Use(async (context, next) =>
|
|||||||
}
|
}
|
||||||
|
|
||||||
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, context.RequestAborted);
|
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, context.RequestAborted);
|
||||||
|
|
||||||
context.Response.StatusCode = (int)response.StatusCode;
|
context.Response.StatusCode = (int)response.StatusCode;
|
||||||
|
|
||||||
foreach (var header in response.Headers)
|
foreach (var header in response.Headers)
|
||||||
{
|
{
|
||||||
context.Response.Headers[header.Key] = header.Value.ToArray();
|
context.Response.Headers[header.Key] = header.Value.ToArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach (var header in response.Content.Headers)
|
foreach (var header in response.Content.Headers)
|
||||||
{
|
{
|
||||||
context.Response.Headers[header.Key] = header.Value.ToArray();
|
context.Response.Headers[header.Key] = header.Value.ToArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
context.Response.Headers.Remove("transfer-encoding");
|
context.Response.Headers.Remove("transfer-encoding");
|
||||||
|
|
||||||
await response.Content.CopyToAsync(context.Response.Body);
|
await response.Content.CopyToAsync(context.Response.Body);
|
||||||
}
|
}
|
||||||
catch (HttpRequestException ex)
|
catch (HttpRequestException ex)
|
||||||
{
|
{
|
||||||
|
LogToFile($"HTTP proxy error: {ex.Message}");
|
||||||
context.Response.StatusCode = (int)HttpStatusCode.BadGateway;
|
context.Response.StatusCode = (int)HttpStatusCode.BadGateway;
|
||||||
await context.Response.WriteAsync($"Backend request failed: {ex.Message}");
|
await context.Response.WriteAsync($"Backend request failed: {ex.Message}");
|
||||||
}
|
}
|
||||||
@@ -141,9 +148,10 @@ async Task ForwardWebSocketAsync(WebSocket source, WebSocket destination, Cancel
|
|||||||
await destination.SendAsync(new ArraySegment<byte>(buffer, 0, result.Count), result.MessageType, result.EndOfMessage, cancellationToken);
|
await destination.SendAsync(new ArraySegment<byte>(buffer, 0, result.Count), result.MessageType, result.EndOfMessage, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (WebSocketException)
|
catch (WebSocketException ex)
|
||||||
{
|
{
|
||||||
// Normal close or network error
|
LogToFile($"WebSocket forwarding error: {ex.Message}");
|
||||||
|
await destination.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Error", cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,3 +10,5 @@ this will also include a primary server to house all the common configs across a
|
|||||||
|
|
||||||
The new lst will run in docker by building your own image and deploying or pulling the image down.
|
The new lst will run in docker by building your own image and deploying or pulling the image down.
|
||||||
you will also be able to run it in windows or linux.
|
you will also be able to run it in windows or linux.
|
||||||
|
|
||||||
|
when developing in lst and you want to run hotloads installed and configure https://github.com/air-verse/air
|
||||||
|
|||||||
0
backend/.air.toml
Normal file
0
backend/.air.toml
Normal file
@@ -1,53 +0,0 @@
|
|||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"lst.net/utils/db"
|
|
||||||
logging "lst.net/utils/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ConfigUpdateInput struct {
|
|
||||||
Description *string `json:"description"`
|
|
||||||
Value *string `json:"value"`
|
|
||||||
Enabled *bool `json:"enabled"`
|
|
||||||
AppService *string `json:"app_service"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func RegisterConfigRoutes(l *gin.Engine, baseUrl string) {
|
|
||||||
// seed the db on start up
|
|
||||||
db.SeedConfigs(db.DB)
|
|
||||||
|
|
||||||
configGroup := l.Group(baseUrl + "/api/config")
|
|
||||||
configGroup.GET("/configs", getconfigs)
|
|
||||||
configGroup.POST("/configs", newConfig)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getconfigs(c *gin.Context) {
|
|
||||||
log := logging.New()
|
|
||||||
configs, err := db.GetAllConfigs(db.DB)
|
|
||||||
log.Info("Current Configs", "system", map[string]interface{}{
|
|
||||||
"endpoint": "/api/config/configs",
|
|
||||||
"client_ip": c.ClientIP(),
|
|
||||||
"user_agent": c.Request.UserAgent(),
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(500, gin.H{"message": "There was an error getting the configs", "error": err})
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(200, gin.H{"message": "Current configs", "data": configs})
|
|
||||||
}
|
|
||||||
|
|
||||||
func newConfig(c *gin.Context) {
|
|
||||||
|
|
||||||
var config ConfigUpdateInput
|
|
||||||
|
|
||||||
err := c.ShouldBindBodyWithJSON(&config)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
c.JSON(500, gin.H{"message": "Internal Server Error"})
|
|
||||||
}
|
|
||||||
|
|
||||||
c.JSON(200, gin.H{"message": "New config was just added", "data": config})
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1 +1 @@
|
|||||||
package system
|
package servers
|
||||||
|
|||||||
88
backend/cmd/services/system/settings/settings.go
Normal file
88
backend/cmd/services/system/settings/settings.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package settings
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"lst.net/utils/db"
|
||||||
|
"lst.net/utils/inputs"
|
||||||
|
logging "lst.net/utils/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RegisterSettingsRoutes(l *gin.Engine, baseUrl string) {
|
||||||
|
// seed the db on start up
|
||||||
|
db.SeedConfigs(db.DB)
|
||||||
|
|
||||||
|
s := l.Group(baseUrl + "/api/v1")
|
||||||
|
s.GET("/settings", getSettings)
|
||||||
|
s.PATCH("/settings/:id", updateSettingById)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSettings(c *gin.Context) {
|
||||||
|
logger := logging.New()
|
||||||
|
configs, err := db.GetAllConfigs(db.DB)
|
||||||
|
logger.Info("Current Settings", "system", map[string]interface{}{
|
||||||
|
"endpoint": "/api/v1/settings",
|
||||||
|
"client_ip": c.ClientIP(),
|
||||||
|
"user_agent": c.Request.UserAgent(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Current Settings", "system", map[string]interface{}{
|
||||||
|
"endpoint": "/api/v1/settings",
|
||||||
|
"client_ip": c.ClientIP(),
|
||||||
|
"user_agent": c.Request.UserAgent(),
|
||||||
|
"error": err,
|
||||||
|
})
|
||||||
|
c.JSON(500, gin.H{"message": "There was an error getting the settings", "error": err})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(200, gin.H{"message": "Current settings", "data": configs})
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateSettingById(c *gin.Context) {
|
||||||
|
logger := logging.New()
|
||||||
|
settingID := c.Param("id")
|
||||||
|
|
||||||
|
if settingID == "" {
|
||||||
|
c.JSON(500, gin.H{"message": "Invalid data"})
|
||||||
|
logger.Error("Invalid data", "system", map[string]interface{}{
|
||||||
|
"endpoint": "/api/v1/settings",
|
||||||
|
"client_ip": c.ClientIP(),
|
||||||
|
"user_agent": c.Request.UserAgent(),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var setting inputs.SettingUpdateInput
|
||||||
|
|
||||||
|
//err := c.ShouldBindBodyWithJSON(&setting)
|
||||||
|
|
||||||
|
decoder := json.NewDecoder(c.Request.Body) // more strict and will force us to have correct data
|
||||||
|
decoder.DisallowUnknownFields()
|
||||||
|
|
||||||
|
if err := decoder.Decode(&setting); err != nil {
|
||||||
|
c.JSON(400, gin.H{"message": "Invalid request body", "error": err.Error()})
|
||||||
|
logger.Error("Invalid request body", "system", map[string]interface{}{
|
||||||
|
"endpoint": "/api/v1/settings",
|
||||||
|
"client_ip": c.ClientIP(),
|
||||||
|
"user_agent": c.Request.UserAgent(),
|
||||||
|
"error": err,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.UpdateConfig(db.DB, settingID, setting); err != nil {
|
||||||
|
c.JSON(500, gin.H{"message": "Failed to update setting", "error": err.Error()})
|
||||||
|
logger.Error("Failed to update setting", "system", map[string]interface{}{
|
||||||
|
"endpoint": "/api/v1/settings",
|
||||||
|
"client_ip": c.ClientIP(),
|
||||||
|
"user_agent": c.Request.UserAgent(),
|
||||||
|
"error": err,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(200, gin.H{"message": "Setting was just updated", "data": setting})
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1 +0,0 @@
|
|||||||
package system
|
|
||||||
179
backend/cmd/services/websocket/channel_manager.go
Normal file
179
backend/cmd/services/websocket/channel_manager.go
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
package websocket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
logging "lst.net/utils/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Channel struct {
|
||||||
|
Name string
|
||||||
|
Clients map[*Client]bool
|
||||||
|
Register chan *Client
|
||||||
|
Unregister chan *Client
|
||||||
|
Broadcast chan []byte
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
channels = make(map[string]*Channel)
|
||||||
|
channelsMu sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
// InitializeChannels creates and returns all channels
|
||||||
|
func InitializeChannels() {
|
||||||
|
channelsMu.Lock()
|
||||||
|
defer channelsMu.Unlock()
|
||||||
|
|
||||||
|
channels["logServices"] = NewChannel("logServices")
|
||||||
|
channels["labels"] = NewChannel("labels")
|
||||||
|
// Add more channels here as needed
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewChannel(name string) *Channel {
|
||||||
|
return &Channel{
|
||||||
|
Name: name,
|
||||||
|
Clients: make(map[*Client]bool),
|
||||||
|
Register: make(chan *Client),
|
||||||
|
Unregister: make(chan *Client),
|
||||||
|
Broadcast: make(chan []byte),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetChannel(name string) (*Channel, bool) {
|
||||||
|
channelsMu.RLock()
|
||||||
|
defer channelsMu.RUnlock()
|
||||||
|
ch, exists := channels[name]
|
||||||
|
return ch, exists
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetAllChannels() map[string]*Channel {
|
||||||
|
channelsMu.RLock()
|
||||||
|
defer channelsMu.RUnlock()
|
||||||
|
|
||||||
|
chs := make(map[string]*Channel)
|
||||||
|
for k, v := range channels {
|
||||||
|
chs[k] = v
|
||||||
|
}
|
||||||
|
return chs
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartAllChannels() {
|
||||||
|
|
||||||
|
channelsMu.RLock()
|
||||||
|
defer channelsMu.RUnlock()
|
||||||
|
|
||||||
|
for _, ch := range channels {
|
||||||
|
go ch.RunChannel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CleanupChannels() {
|
||||||
|
channelsMu.Lock()
|
||||||
|
defer channelsMu.Unlock()
|
||||||
|
|
||||||
|
for _, ch := range channels {
|
||||||
|
close(ch.Broadcast)
|
||||||
|
// Add any other cleanup needed
|
||||||
|
}
|
||||||
|
channels = make(map[string]*Channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartBroadcasting(broadcaster chan logging.Message, channels map[string]*Channel) {
|
||||||
|
logger := logging.New()
|
||||||
|
go func() {
|
||||||
|
for msg := range broadcaster {
|
||||||
|
switch msg.Channel {
|
||||||
|
case "logServices":
|
||||||
|
// Just forward the message - filtering happens in RunChannel()
|
||||||
|
messageBytes, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Error marshaling message", "websocket", map[string]interface{}{
|
||||||
|
"errors": err,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
channels["logServices"].Broadcast <- messageBytes
|
||||||
|
|
||||||
|
case "labels":
|
||||||
|
// Future labels handling
|
||||||
|
messageBytes, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Error marshaling message", "websocket", map[string]interface{}{
|
||||||
|
"errors": err,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
channels["labels"].Broadcast <- messageBytes
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Printf("Received message for unknown channel: %s", msg.Channel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func contains(slice []string, item string) bool {
|
||||||
|
// Empty filter slice means "match all"
|
||||||
|
if len(slice) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Case-insensitive comparison
|
||||||
|
item = strings.ToLower(item)
|
||||||
|
for _, s := range slice {
|
||||||
|
if strings.ToLower(s) == item {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Updated Channel.RunChannel() for logServices filtering
|
||||||
|
func (ch *Channel) RunChannel() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case client := <-ch.Register:
|
||||||
|
ch.lock.Lock()
|
||||||
|
ch.Clients[client] = true
|
||||||
|
ch.lock.Unlock()
|
||||||
|
|
||||||
|
case client := <-ch.Unregister:
|
||||||
|
ch.lock.Lock()
|
||||||
|
delete(ch.Clients, client)
|
||||||
|
ch.lock.Unlock()
|
||||||
|
|
||||||
|
case message := <-ch.Broadcast:
|
||||||
|
var msg logging.Message
|
||||||
|
if err := json.Unmarshal(message, &msg); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.lock.RLock()
|
||||||
|
for client := range ch.Clients {
|
||||||
|
// Special filtering for logServices
|
||||||
|
if ch.Name == "logServices" {
|
||||||
|
logLevel, _ := msg.Meta["level"].(string)
|
||||||
|
logService, _ := msg.Meta["service"].(string)
|
||||||
|
|
||||||
|
levelMatch := len(client.LogLevels) == 0 || contains(client.LogLevels, logLevel)
|
||||||
|
serviceMatch := len(client.Services) == 0 || contains(client.Services, logService)
|
||||||
|
|
||||||
|
if !levelMatch || !serviceMatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case client.Send <- message:
|
||||||
|
default:
|
||||||
|
ch.Unregister <- client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ch.lock.RUnlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,93 +0,0 @@
|
|||||||
package socketio
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
logging "lst.net/utils/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
ClientID uuid.UUID
|
|
||||||
Conn *websocket.Conn
|
|
||||||
APIKey string
|
|
||||||
IPAddress string
|
|
||||||
UserAgent string
|
|
||||||
Send chan []byte
|
|
||||||
Channels map[string]bool // e.g., {"logs": true, "labels": true}
|
|
||||||
}
|
|
||||||
|
|
||||||
var clients = make(map[*Client]bool)
|
|
||||||
|
|
||||||
var clientsLock sync.Mutex
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
var broadcast = make(chan string)
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
msg := <-broadcast
|
|
||||||
|
|
||||||
clientsLock.Lock()
|
|
||||||
for client := range clients {
|
|
||||||
if client.Channels["logs"] {
|
|
||||||
err := client.Conn.WriteMessage(websocket.TextMessage, []byte(msg))
|
|
||||||
if err != nil {
|
|
||||||
log.Println("Write error:", err)
|
|
||||||
client.Conn.Close()
|
|
||||||
//client.MarkDisconnected()
|
|
||||||
delete(clients, client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
clientsLock.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func StartBroadcasting(broadcaster chan logging.Message) {
|
|
||||||
go func() {
|
|
||||||
log.Println("StartBroadcasting goroutine started")
|
|
||||||
for {
|
|
||||||
msg := <-broadcaster
|
|
||||||
//log.Printf("Received msg on broadcaster: %+v\n", msg)
|
|
||||||
clientsLock.Lock()
|
|
||||||
for client := range clients {
|
|
||||||
if client.Channels[msg.Channel] {
|
|
||||||
log.Println("Sending message to client")
|
|
||||||
err := client.Conn.WriteJSON(msg)
|
|
||||||
if err != nil {
|
|
||||||
log.Println("Write error:", err)
|
|
||||||
client.Conn.Close()
|
|
||||||
client.MarkDisconnected()
|
|
||||||
delete(clients, client)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Println("Skipping client, channel mismatch")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
clientsLock.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// func (c *Client) JoinChannel(name string) {
|
|
||||||
// ch := GetOrCreateChannel(name)
|
|
||||||
// c.Channels[name] = ch
|
|
||||||
// ch.Register <- c
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func (c *Client) LeaveChannel(name string) {
|
|
||||||
// if ch, ok := c.Channels[name]; ok {
|
|
||||||
// ch.Unregister <- c
|
|
||||||
// delete(c.Channels, name)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (c *Client) Disconnect() {
|
|
||||||
// for _, ch := range c.Channels {
|
|
||||||
// ch.Unregister <- c
|
|
||||||
// }
|
|
||||||
close(c.Send)
|
|
||||||
}
|
|
||||||
@@ -1,163 +0,0 @@
|
|||||||
package socketio
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"lst.net/utils/db"
|
|
||||||
)
|
|
||||||
|
|
||||||
type JoinPayload struct {
|
|
||||||
Channel string `json:"channel"`
|
|
||||||
Services []string `json:"services,omitempty"`
|
|
||||||
APIKey string `json:"apiKey"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// type Channel struct {
|
|
||||||
// Name string
|
|
||||||
// Clients map[*Client]bool
|
|
||||||
// Register chan *Client
|
|
||||||
// Unregister chan *Client
|
|
||||||
// Broadcast chan Message
|
|
||||||
// }
|
|
||||||
|
|
||||||
var upgrader = websocket.Upgrader{
|
|
||||||
CheckOrigin: func(r *http.Request) bool { return true }, // allow all origins; customize for prod
|
|
||||||
}
|
|
||||||
|
|
||||||
func SocketHandler(c *gin.Context) {
|
|
||||||
// Upgrade HTTP to websocket
|
|
||||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Println("Failed to upgrade:", err)
|
|
||||||
c.AbortWithStatus(http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Create client struct
|
|
||||||
client := &Client{
|
|
||||||
Conn: conn,
|
|
||||||
IPAddress: c.ClientIP(),
|
|
||||||
UserAgent: c.Request.UserAgent(),
|
|
||||||
Channels: make(map[string]bool),
|
|
||||||
}
|
|
||||||
|
|
||||||
clientsLock.Lock()
|
|
||||||
clients[client] = true
|
|
||||||
clientsLock.Unlock()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
clientsLock.Lock()
|
|
||||||
delete(clients, client)
|
|
||||||
clientsLock.Unlock()
|
|
||||||
client.MarkDisconnected()
|
|
||||||
client.Disconnect()
|
|
||||||
conn.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Read message from client
|
|
||||||
_, msg, err := conn.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
log.Println("Read error:", err)
|
|
||||||
clientsLock.Lock()
|
|
||||||
delete(clients, client)
|
|
||||||
clientsLock.Unlock()
|
|
||||||
client.MarkDisconnected()
|
|
||||||
client.Disconnect()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
var payload JoinPayload
|
|
||||||
err = json.Unmarshal(msg, &payload)
|
|
||||||
if err != nil {
|
|
||||||
log.Println("Invalid JSON payload:", err)
|
|
||||||
clientsLock.Lock()
|
|
||||||
delete(clients, client)
|
|
||||||
clientsLock.Unlock()
|
|
||||||
client.MarkDisconnected()
|
|
||||||
client.Disconnect()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simple API key check (replace with real auth)
|
|
||||||
if payload.APIKey == "" {
|
|
||||||
conn.WriteMessage(websocket.TextMessage, []byte("Missing API Key"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
client.APIKey = payload.APIKey
|
|
||||||
|
|
||||||
// Handle channel subscription, add more here as we get more in.
|
|
||||||
switch payload.Channel {
|
|
||||||
case "logs":
|
|
||||||
client.Channels["logs"] = true
|
|
||||||
case "logServices":
|
|
||||||
for _, svc := range payload.Services {
|
|
||||||
client.Channels["logServices:"+svc] = true
|
|
||||||
}
|
|
||||||
case "labels":
|
|
||||||
client.Channels["labels"] = true
|
|
||||||
default:
|
|
||||||
conn.WriteMessage(websocket.TextMessage, []byte("Unknown channel"))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save client info in DB
|
|
||||||
client.SaveToDB()
|
|
||||||
|
|
||||||
// Confirm subscription
|
|
||||||
resp := map[string]string{
|
|
||||||
"status": "subscribed",
|
|
||||||
"channel": payload.Channel,
|
|
||||||
}
|
|
||||||
respJSON, _ := json.Marshal(resp)
|
|
||||||
conn.WriteMessage(websocket.TextMessage, respJSON)
|
|
||||||
|
|
||||||
// You could now start pushing messages to client or keep connection open
|
|
||||||
// For demo, just wait and keep connection alive
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) SaveToDB() {
|
|
||||||
// Convert c.Channels (map[string]bool) to map[string]interface{} for JSONB
|
|
||||||
channels := make(map[string]interface{})
|
|
||||||
for ch := range c.Channels {
|
|
||||||
channels[ch] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
clientRecord := &db.ClientRecord{
|
|
||||||
APIKey: c.APIKey,
|
|
||||||
IPAddress: c.IPAddress,
|
|
||||||
UserAgent: c.UserAgent,
|
|
||||||
Channels: db.JSONB(channels),
|
|
||||||
ConnectedAt: time.Now(),
|
|
||||||
LastHeartbeat: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.DB.Create(&clientRecord).Error; err != nil {
|
|
||||||
log.Println("❌ Error saving client:", err)
|
|
||||||
} else {
|
|
||||||
c.ClientID = clientRecord.ClientID // ✅ Assign the generated UUID back to the client
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) MarkDisconnected() {
|
|
||||||
now := time.Now()
|
|
||||||
res := db.DB.Model(&db.ClientRecord{}).
|
|
||||||
Where("client_id = ?", c.ClientID).
|
|
||||||
Updates(map[string]interface{}{
|
|
||||||
"disconnected_at": &now,
|
|
||||||
})
|
|
||||||
|
|
||||||
if res.RowsAffected == 0 {
|
|
||||||
log.Println("⚠️ No rows updated for client_id:", c.ClientID)
|
|
||||||
}
|
|
||||||
if res.Error != nil {
|
|
||||||
log.Println("❌ Error updating disconnected_at:", res.Error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
24
backend/cmd/services/websocket/label.go
Normal file
24
backend/cmd/services/websocket/label.go
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package websocket
|
||||||
|
|
||||||
|
import logging "lst.net/utils/logger"
|
||||||
|
|
||||||
|
func LabelProcessor(broadcaster chan logging.Message) {
|
||||||
|
// Initialize any label-specific listeners
|
||||||
|
// This could listen to a different PG channel or process differently
|
||||||
|
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// // Implementation depends on your label data source
|
||||||
|
// // Example:
|
||||||
|
// case labelEvent := <-someLabelChannel:
|
||||||
|
// broadcaster <- logging.Message{
|
||||||
|
// Channel: "labels",
|
||||||
|
// Data: labelEvent.Data,
|
||||||
|
// Meta: map[string]interface{}{
|
||||||
|
// "label": labelEvent.Label,
|
||||||
|
// "type": labelEvent.Type,
|
||||||
|
// },
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
@@ -1,15 +1,4 @@
|
|||||||
package channelmgt
|
package websocket
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/lib/pq"
|
|
||||||
logging "lst.net/utils/logger"
|
|
||||||
)
|
|
||||||
|
|
||||||
// setup the notifiyer
|
// setup the notifiyer
|
||||||
|
|
||||||
@@ -27,9 +16,21 @@ import (
|
|||||||
// AFTER INSERT ON logs
|
// AFTER INSERT ON logs
|
||||||
// FOR EACH ROW EXECUTE FUNCTION notify_new_log();
|
// FOR EACH ROW EXECUTE FUNCTION notify_new_log();
|
||||||
|
|
||||||
func AllLogs(db *sql.DB, broadcaster chan logging.Message) {
|
import (
|
||||||
fmt.Println("[AllLogs] started")
|
"encoding/json"
|
||||||
log := logging.New()
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
logging "lst.net/utils/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func LogServices(broadcaster chan logging.Message) {
|
||||||
|
logger := logging.New()
|
||||||
|
|
||||||
|
logger.Info("[LogServices] started - single channel for all logs", "websocket", map[string]interface{}{})
|
||||||
|
|
||||||
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
|
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
|
||||||
os.Getenv("DB_HOST"),
|
os.Getenv("DB_HOST"),
|
||||||
os.Getenv("DB_PORT"),
|
os.Getenv("DB_PORT"),
|
||||||
@@ -41,41 +42,37 @@ func AllLogs(db *sql.DB, broadcaster chan logging.Message) {
|
|||||||
listener := pq.NewListener(dsn, 10*time.Second, time.Minute, nil)
|
listener := pq.NewListener(dsn, 10*time.Second, time.Minute, nil)
|
||||||
err := listener.Listen("new_log")
|
err := listener.Listen("new_log")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic("Failed to LISTEN on new_log", "logger", map[string]interface{}{
|
logger.Panic("Failed to LISTEN on new_log", "logger", map[string]interface{}{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Listening for new logs...")
|
fmt.Println("Listening for all logs through single logServices channel...")
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case notify := <-listener.Notify:
|
case notify := <-listener.Notify:
|
||||||
if notify != nil {
|
if notify != nil {
|
||||||
fmt.Println("New log notification received")
|
|
||||||
|
|
||||||
// Unmarshal the JSON payload of the inserted row
|
|
||||||
var logData map[string]interface{}
|
var logData map[string]interface{}
|
||||||
if err := json.Unmarshal([]byte(notify.Extra), &logData); err != nil {
|
if err := json.Unmarshal([]byte(notify.Extra), &logData); err != nil {
|
||||||
log.Error("Failed to unmarshal notification payload", "logger", map[string]interface{}{
|
logger.Error("Failed to unmarshal notification payload", "logger", map[string]interface{}{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
})
|
})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build message to broadcast
|
// Always send to logServices channel
|
||||||
msg := logging.Message{
|
broadcaster <- logging.Message{
|
||||||
Channel: "logs", // This matches your logs channel name
|
Channel: "logServices",
|
||||||
Data: logData,
|
Data: logData,
|
||||||
|
Meta: map[string]interface{}{
|
||||||
|
"level": logData["level"],
|
||||||
|
"service": logData["service"],
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcaster <- msg
|
|
||||||
//fmt.Printf("[Broadcasting] sending: %+v\n", msg)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(90 * time.Second):
|
case <-time.After(90 * time.Second):
|
||||||
go func() {
|
go func() {
|
||||||
log.Debug("Re-pinging Postgres LISTEN", "logger", map[string]interface{}{})
|
|
||||||
listener.Ping()
|
listener.Ping()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -1,25 +1,55 @@
|
|||||||
package socketio
|
package websocket
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gin-gonic/gin"
|
"net/http"
|
||||||
|
|
||||||
channelmgt "lst.net/cmd/services/websocket/channelMGT"
|
"github.com/gin-gonic/gin"
|
||||||
"lst.net/utils/db"
|
|
||||||
logging "lst.net/utils/logger"
|
logging "lst.net/utils/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
var broadcaster = make(chan logging.Message) // define broadcaster here so it’s accessible
|
var (
|
||||||
|
broadcaster = make(chan logging.Message)
|
||||||
|
)
|
||||||
|
|
||||||
func RegisterSocketRoutes(r *gin.Engine) {
|
func RegisterSocketRoutes(r *gin.Engine, base_url string) {
|
||||||
sqlDB, err := db.DB.DB()
|
// Initialize all channels
|
||||||
if err != nil {
|
InitializeChannels()
|
||||||
panic(err)
|
|
||||||
|
// Start channel processors
|
||||||
|
StartAllChannels()
|
||||||
|
|
||||||
|
// Start background services
|
||||||
|
go LogServices(broadcaster)
|
||||||
|
go StartBroadcasting(broadcaster, channels)
|
||||||
|
|
||||||
|
// WebSocket route
|
||||||
|
r.GET(base_url+"/ws", func(c *gin.Context) {
|
||||||
|
SocketHandler(c, channels)
|
||||||
|
})
|
||||||
|
|
||||||
|
r.GET(base_url+"/ws/clients", AdminAuthMiddleware(), handleGetClients)
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleGetClients(c *gin.Context) {
|
||||||
|
channel := c.Query("channel")
|
||||||
|
|
||||||
|
var clientList []*Client
|
||||||
|
if channel != "" {
|
||||||
|
clientList = GetClientsByChannel(channel)
|
||||||
|
} else {
|
||||||
|
clientList = GetAllClients()
|
||||||
}
|
}
|
||||||
|
|
||||||
// channels
|
c.JSON(http.StatusOK, gin.H{
|
||||||
go channelmgt.AllLogs(sqlDB, broadcaster)
|
"count": len(clientList),
|
||||||
go StartBroadcasting(broadcaster)
|
"clients": clientList,
|
||||||
|
})
|
||||||
wsGroup := r.Group("/ws")
|
}
|
||||||
wsGroup.GET("/connect", SocketHandler)
|
|
||||||
|
func AdminAuthMiddleware() gin.HandlerFunc {
|
||||||
|
return func(c *gin.Context) {
|
||||||
|
// Implement your admin authentication logic
|
||||||
|
// Example: Check API key or JWT token
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
273
backend/cmd/services/websocket/ws_client.go
Normal file
273
backend/cmd/services/websocket/ws_client.go
Normal file
@@ -0,0 +1,273 @@
|
|||||||
|
package websocket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"lst.net/utils/db"
|
||||||
|
logging "lst.net/utils/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
clients = make(map[*Client]bool)
|
||||||
|
clientsMu sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
ClientID uuid.UUID `json:"client_id"`
|
||||||
|
Conn *websocket.Conn `json:"-"` // Excluded from JSON
|
||||||
|
APIKey string `json:"api_key"`
|
||||||
|
IPAddress string `json:"ip_address"`
|
||||||
|
UserAgent string `json:"user_agent"`
|
||||||
|
Send chan []byte `json:"-"` // Excluded from JSON
|
||||||
|
Channels map[string]bool `json:"channels"`
|
||||||
|
LogLevels []string `json:"levels,omitempty"`
|
||||||
|
Services []string `json:"services,omitempty"`
|
||||||
|
Labels []string `json:"labels,omitempty"`
|
||||||
|
ConnectedAt time.Time `json:"connected_at"`
|
||||||
|
done chan struct{} // For graceful shutdown
|
||||||
|
isAlive atomic.Bool
|
||||||
|
lastActive time.Time // Tracks last activity
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) SaveToDB() {
|
||||||
|
// Convert c.Channels (map[string]bool) to map[string]interface{} for JSONB
|
||||||
|
channels := make(map[string]interface{})
|
||||||
|
for ch := range c.Channels {
|
||||||
|
channels[ch] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
clientRecord := &db.ClientRecord{
|
||||||
|
APIKey: c.APIKey,
|
||||||
|
IPAddress: c.IPAddress,
|
||||||
|
UserAgent: c.UserAgent,
|
||||||
|
Channels: db.JSONB(channels),
|
||||||
|
ConnectedAt: time.Now(),
|
||||||
|
LastHeartbeat: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.DB.Create(&clientRecord).Error; err != nil {
|
||||||
|
log.Println("❌ Error saving client:", err)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
c.ClientID = clientRecord.ClientID
|
||||||
|
c.ConnectedAt = clientRecord.ConnectedAt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) MarkDisconnected() {
|
||||||
|
logger := logging.New()
|
||||||
|
clientData := fmt.Sprintf("Client %v just lefts us", c.ClientID)
|
||||||
|
logger.Info(clientData, "websocket", map[string]interface{}{})
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
res := db.DB.Model(&db.ClientRecord{}).
|
||||||
|
Where("client_id = ?", c.ClientID).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"disconnected_at": &now,
|
||||||
|
})
|
||||||
|
|
||||||
|
if res.RowsAffected == 0 {
|
||||||
|
|
||||||
|
logger.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
|
||||||
|
"clientID": c.ClientID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if res.Error != nil {
|
||||||
|
|
||||||
|
logger.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
|
||||||
|
"clientID": c.ClientID,
|
||||||
|
"error": res.Error,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) SafeClient() *Client {
|
||||||
|
return &Client{
|
||||||
|
ClientID: c.ClientID,
|
||||||
|
APIKey: c.APIKey,
|
||||||
|
IPAddress: c.IPAddress,
|
||||||
|
UserAgent: c.UserAgent,
|
||||||
|
Channels: c.Channels,
|
||||||
|
LogLevels: c.LogLevels,
|
||||||
|
Services: c.Services,
|
||||||
|
Labels: c.Labels,
|
||||||
|
ConnectedAt: c.ConnectedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllClients returns safe representations of all clients
|
||||||
|
func GetAllClients() []*Client {
|
||||||
|
clientsMu.RLock()
|
||||||
|
defer clientsMu.RUnlock()
|
||||||
|
|
||||||
|
var clientList []*Client
|
||||||
|
for client := range clients {
|
||||||
|
clientList = append(clientList, client.SafeClient())
|
||||||
|
}
|
||||||
|
return clientList
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetClientsByChannel returns clients in a specific channel
|
||||||
|
func GetClientsByChannel(channel string) []*Client {
|
||||||
|
clientsMu.RLock()
|
||||||
|
defer clientsMu.RUnlock()
|
||||||
|
|
||||||
|
var channelClients []*Client
|
||||||
|
for client := range clients {
|
||||||
|
if client.Channels[channel] {
|
||||||
|
channelClients = append(channelClients, client.SafeClient())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return channelClients
|
||||||
|
}
|
||||||
|
|
||||||
|
// heat beat stuff
|
||||||
|
const (
|
||||||
|
pingPeriod = 30 * time.Second
|
||||||
|
pongWait = 60 * time.Second
|
||||||
|
writeWait = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Client) StartHeartbeat() {
|
||||||
|
logger := logging.New()
|
||||||
|
log.Println("Started hearbeat")
|
||||||
|
ticker := time.NewTicker(pingPeriod)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
if !c.isAlive.Load() { // Correct way to read atomic.Bool
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
|
log.Printf("Heartbeat failed for %s: %v", c.ClientID, err)
|
||||||
|
c.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
res := db.DB.Model(&db.ClientRecord{}).
|
||||||
|
Where("client_id = ?", c.ClientID).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"last_heartbeat": &now,
|
||||||
|
})
|
||||||
|
|
||||||
|
if res.RowsAffected == 0 {
|
||||||
|
|
||||||
|
logger.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
|
||||||
|
"clientID": c.ClientID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if res.Error != nil {
|
||||||
|
|
||||||
|
logger.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
|
||||||
|
"clientID": c.ClientID,
|
||||||
|
"error": res.Error,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-c.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Close() {
|
||||||
|
if c.isAlive.CompareAndSwap(true, false) { // Atomic swap
|
||||||
|
close(c.done)
|
||||||
|
c.Conn.Close()
|
||||||
|
// Add any other cleanup here
|
||||||
|
c.MarkDisconnected()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) startServerPings() {
|
||||||
|
ticker := time.NewTicker(60 * time.Second) // Ping every 30s
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||||
|
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
|
c.Close() // Disconnect if ping fails
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-c.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) markActive() {
|
||||||
|
c.lastActive = time.Now() // No mutex needed if single-writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) IsActive() bool {
|
||||||
|
return time.Since(c.lastActive) < 45*time.Second // 1.5x ping interval
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) updateHeartbeat() {
|
||||||
|
//fmt.Println("Updating heatbeat")
|
||||||
|
now := time.Now()
|
||||||
|
logger := logging.New()
|
||||||
|
|
||||||
|
//fmt.Printf("Updating heartbeat for client: %s at %v\n", c.ClientID, now)
|
||||||
|
|
||||||
|
//db.DB = db.DB.Debug()
|
||||||
|
res := db.DB.Model(&db.ClientRecord{}).
|
||||||
|
Where("client_id = ?", c.ClientID).
|
||||||
|
Updates(map[string]interface{}{
|
||||||
|
"last_heartbeat": &now, // Explicit format
|
||||||
|
})
|
||||||
|
//fmt.Printf("Executed SQL: %v\n", db.DB.Statement.SQL.String())
|
||||||
|
if res.RowsAffected == 0 {
|
||||||
|
|
||||||
|
logger.Info("⚠️ No rows updated for client_id", "websocket", map[string]interface{}{
|
||||||
|
"clientID": c.ClientID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if res.Error != nil {
|
||||||
|
|
||||||
|
logger.Error("❌ Error updating disconnected_at", "websocket", map[string]interface{}{
|
||||||
|
"clientID": c.ClientID,
|
||||||
|
"error": res.Error,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
// 2. Verify DB connection
|
||||||
|
if db.DB == nil {
|
||||||
|
logger.Error("DB connection is nil", "websocket", map[string]interface{}{})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Test raw SQL execution first
|
||||||
|
testRes := db.DB.Exec("SELECT 1")
|
||||||
|
if testRes.Error != nil {
|
||||||
|
logger.Error("DB ping failed", "websocket", map[string]interface{}{
|
||||||
|
"error": testRes.Error,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// work on this stats later
|
||||||
|
// Add to your admin endpoint
|
||||||
|
// type ConnectionStats struct {
|
||||||
|
// TotalConnections int `json:"total_connections"`
|
||||||
|
// ActiveConnections int `json:"active_connections"`
|
||||||
|
// AvgDuration string `json:"avg_duration"`
|
||||||
|
// }
|
||||||
|
|
||||||
|
// func GetConnectionStats() ConnectionStats {
|
||||||
|
// // Implement your metrics tracking
|
||||||
|
// }
|
||||||
224
backend/cmd/services/websocket/ws_handler.go
Normal file
224
backend/cmd/services/websocket/ws_handler.go
Normal file
@@ -0,0 +1,224 @@
|
|||||||
|
package websocket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JoinPayload struct {
|
||||||
|
Channel string `json:"channel"`
|
||||||
|
APIKey string `json:"apiKey"`
|
||||||
|
Services []string `json:"services,omitempty"`
|
||||||
|
Levels []string `json:"levels,omitempty"`
|
||||||
|
Labels []string `json:"labels,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{
|
||||||
|
CheckOrigin: func(r *http.Request) bool { return true }, // allow all origins; customize for prod
|
||||||
|
HandshakeTimeout: 15 * time.Second,
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 1024,
|
||||||
|
EnableCompression: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
func SocketHandler(c *gin.Context, channels map[string]*Channel) {
|
||||||
|
// Upgrade HTTP to WebSocket
|
||||||
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("WebSocket upgrade failed:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//defer conn.Close()
|
||||||
|
|
||||||
|
// Create new client
|
||||||
|
client := &Client{
|
||||||
|
Conn: conn,
|
||||||
|
APIKey: "exampleAPIKey",
|
||||||
|
Send: make(chan []byte, 256), // Buffered channel
|
||||||
|
Channels: make(map[string]bool),
|
||||||
|
IPAddress: c.ClientIP(),
|
||||||
|
UserAgent: c.Request.UserAgent(),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
client.isAlive.Store(true)
|
||||||
|
// Add to global clients map
|
||||||
|
clientsMu.Lock()
|
||||||
|
clients[client] = true
|
||||||
|
clientsMu.Unlock()
|
||||||
|
|
||||||
|
// Save initial connection to DB
|
||||||
|
client.SaveToDB()
|
||||||
|
// Save initial connection to DB
|
||||||
|
// if err := client.SaveToDB(); err != nil {
|
||||||
|
// log.Println("Failed to save client to DB:", err)
|
||||||
|
// conn.Close()
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
|
// Set handlers
|
||||||
|
conn.SetPingHandler(func(string) error {
|
||||||
|
return nil // Auto-responds with pong
|
||||||
|
})
|
||||||
|
|
||||||
|
conn.SetPongHandler(func(string) error {
|
||||||
|
now := time.Now()
|
||||||
|
client.markActive() // Track last pong time
|
||||||
|
client.lastActive = now
|
||||||
|
client.updateHeartbeat()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Start server-side ping ticker
|
||||||
|
go client.startServerPings()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// Unregister from all channels
|
||||||
|
for channelName := range client.Channels {
|
||||||
|
if ch, exists := channels[channelName]; exists {
|
||||||
|
ch.Unregister <- client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from global clients map
|
||||||
|
clientsMu.Lock()
|
||||||
|
delete(clients, client)
|
||||||
|
clientsMu.Unlock()
|
||||||
|
|
||||||
|
// Mark disconnected in DB
|
||||||
|
client.MarkDisconnected()
|
||||||
|
|
||||||
|
// Close connection
|
||||||
|
conn.Close()
|
||||||
|
log.Printf("Client disconnected: %s", client.ClientID)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send welcome message immediately
|
||||||
|
welcomeMsg := map[string]string{
|
||||||
|
"status": "connected",
|
||||||
|
"message": "Welcome to the WebSocket server. Send subscription request to begin.",
|
||||||
|
}
|
||||||
|
if err := conn.WriteJSON(welcomeMsg); err != nil {
|
||||||
|
log.Println("Failed to send welcome message:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message handling goroutine
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
// Cleanup on disconnect
|
||||||
|
for channelName := range client.Channels {
|
||||||
|
if ch, exists := channels[channelName]; exists {
|
||||||
|
ch.Unregister <- client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(client.Send)
|
||||||
|
client.MarkDisconnected()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
_, msg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
|
||||||
|
log.Printf("Client disconnected unexpectedly: %v", err)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload struct {
|
||||||
|
Channel string `json:"channel"`
|
||||||
|
APIKey string `json:"apiKey"`
|
||||||
|
Services []string `json:"services,omitempty"`
|
||||||
|
Levels []string `json:"levels,omitempty"`
|
||||||
|
Labels []string `json:"labels,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(msg, &payload); err != nil {
|
||||||
|
conn.WriteJSON(map[string]string{"error": "invalid payload format"})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate API key (implement your own validateAPIKey function)
|
||||||
|
// if payload.APIKey == "" || !validateAPIKey(payload.APIKey) {
|
||||||
|
// conn.WriteJSON(map[string]string{"error": "invalid or missing API key"})
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
|
||||||
|
if payload.APIKey == "" {
|
||||||
|
conn.WriteMessage(websocket.TextMessage, []byte("Missing API Key"))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
client.APIKey = payload.APIKey
|
||||||
|
|
||||||
|
// Handle channel subscription
|
||||||
|
switch payload.Channel {
|
||||||
|
case "logServices":
|
||||||
|
// Unregister from other channels if needed
|
||||||
|
if client.Channels["labels"] {
|
||||||
|
channels["labels"].Unregister <- client
|
||||||
|
delete(client.Channels, "labels")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update client filters
|
||||||
|
client.Services = payload.Services
|
||||||
|
client.LogLevels = payload.Levels
|
||||||
|
|
||||||
|
// Register to channel
|
||||||
|
channels["logServices"].Register <- client
|
||||||
|
client.Channels["logServices"] = true
|
||||||
|
|
||||||
|
conn.WriteJSON(map[string]string{
|
||||||
|
"status": "subscribed",
|
||||||
|
"channel": "logServices",
|
||||||
|
})
|
||||||
|
|
||||||
|
case "labels":
|
||||||
|
// Unregister from other channels if needed
|
||||||
|
if client.Channels["logServices"] {
|
||||||
|
channels["logServices"].Unregister <- client
|
||||||
|
delete(client.Channels, "logServices")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set label filters if provided
|
||||||
|
if payload.Labels != nil {
|
||||||
|
client.Labels = payload.Labels
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register to channel
|
||||||
|
channels["labels"].Register <- client
|
||||||
|
client.Channels["labels"] = true
|
||||||
|
|
||||||
|
// Update DB record
|
||||||
|
client.SaveToDB()
|
||||||
|
// if err := client.SaveToDB(); err != nil {
|
||||||
|
// log.Println("Failed to update client labels:", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
conn.WriteJSON(map[string]interface{}{
|
||||||
|
"status": "subscribed",
|
||||||
|
"channel": "labels",
|
||||||
|
"filters": client.Labels,
|
||||||
|
})
|
||||||
|
|
||||||
|
default:
|
||||||
|
conn.WriteJSON(map[string]string{
|
||||||
|
"error": "invalid channel",
|
||||||
|
"available_channels": "logServices, labels",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send messages to client
|
||||||
|
for message := range client.Send {
|
||||||
|
if err := conn.WriteMessage(websocket.TextMessage, message); err != nil {
|
||||||
|
log.Println("Write error:", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,7 +8,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.3
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/joho/godotenv v1.5.1
|
github.com/joho/godotenv v1.5.1
|
||||||
github.com/rs/zerolog v1.34.0
|
github.com/rs/zerolog v1.34.0
|
||||||
github.com/swaggo/swag v1.16.5
|
github.com/swaggo/swag v1.16.6
|
||||||
gorm.io/driver/postgres v1.6.0
|
gorm.io/driver/postgres v1.6.0
|
||||||
gorm.io/gorm v1.30.0
|
gorm.io/gorm v1.30.0
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -24,9 +24,11 @@ import (
|
|||||||
"github.com/gin-contrib/cors"
|
"github.com/gin-contrib/cors"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
"lst.net/cmd/services/system/config"
|
"lst.net/cmd/services/system/settings"
|
||||||
socketio "lst.net/cmd/services/websocket"
|
"lst.net/cmd/services/websocket"
|
||||||
_ "lst.net/docs"
|
|
||||||
|
// _ "lst.net/docs"
|
||||||
|
|
||||||
"lst.net/utils/db"
|
"lst.net/utils/db"
|
||||||
logging "lst.net/utils/logger"
|
logging "lst.net/utils/logger"
|
||||||
)
|
)
|
||||||
@@ -42,7 +44,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize DB
|
// Initialize DB
|
||||||
if err := db.InitDB(); err != nil {
|
if _, err := db.InitDB(); err != nil {
|
||||||
log.Panic("Database intialize failed", "db", map[string]interface{}{
|
log.Panic("Database intialize failed", "db", map[string]interface{}{
|
||||||
"error": err.Error(),
|
"error": err.Error(),
|
||||||
"casue": errors.Unwrap(err),
|
"casue": errors.Unwrap(err),
|
||||||
@@ -112,8 +114,8 @@ func main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
//logging.RegisterLoggerRoutes(r, basePath)
|
//logging.RegisterLoggerRoutes(r, basePath)
|
||||||
socketio.RegisterSocketRoutes(r)
|
websocket.RegisterSocketRoutes(r, basePath)
|
||||||
config.RegisterConfigRoutes(r, basePath)
|
settings.RegisterSettingsRoutes(r, basePath)
|
||||||
|
|
||||||
r.Any(basePath+"/api", errorApiLoc)
|
r.Any(basePath+"/api", errorApiLoc)
|
||||||
|
|
||||||
@@ -136,7 +138,7 @@ func main() {
|
|||||||
// }
|
// }
|
||||||
func errorApiLoc(c *gin.Context) {
|
func errorApiLoc(c *gin.Context) {
|
||||||
log := logging.New()
|
log := logging.New()
|
||||||
log.Info("Api endpoint hit that dose not exist", "system", map[string]interface{}{
|
log.Error("Api endpoint hit that dose not exist", "system", map[string]interface{}{
|
||||||
"endpoint": "/api",
|
"endpoint": "/api",
|
||||||
"client_ip": c.ClientIP(),
|
"client_ip": c.ClientIP(),
|
||||||
"user_agent": c.Request.UserAgent(),
|
"user_agent": c.Request.UserAgent(),
|
||||||
|
|||||||
@@ -1,67 +0,0 @@
|
|||||||
package db
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
gorm.Model
|
|
||||||
ID uint `gorm:"primaryKey;autoIncrement"`
|
|
||||||
Name string `gorm:"uniqueIndex;not null"`
|
|
||||||
Description string `gorm:"type:text"`
|
|
||||||
Value string `gorm:"not null"`
|
|
||||||
Enabled bool `gorm:"default:true"`
|
|
||||||
AppService string `gorm:"default:system"`
|
|
||||||
CreatedAt time.Time `gorm:"index"`
|
|
||||||
UpdatedAt time.Time `gorm:"index"`
|
|
||||||
DeletedAt gorm.DeletedAt `gorm:"index"`
|
|
||||||
}
|
|
||||||
|
|
||||||
var seedConfigData = []Config{
|
|
||||||
{Name: "serverPort", Description: "The port the server will listen on if not running in docker", Value: "4000", Enabled: true},
|
|
||||||
{Name: "server", Description: "The server we will use when connecting to the alplaprod sql", Value: "usmcd1vms006", Enabled: true},
|
|
||||||
}
|
|
||||||
|
|
||||||
func SeedConfigs(db *gorm.DB) error {
|
|
||||||
|
|
||||||
for _, cfg := range seedConfigData {
|
|
||||||
var existing Config
|
|
||||||
// Try to find config by unique name
|
|
||||||
result := db.Where("name =?", cfg.Name).First(&existing)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
if result.Error == gorm.ErrRecordNotFound {
|
|
||||||
// not here lets add it
|
|
||||||
if err := db.Create(&cfg).Error; err != nil {
|
|
||||||
log.Printf("Failed to seed config %s: %v", cfg.Name, err)
|
|
||||||
}
|
|
||||||
log.Printf("Seeded new config: %s", cfg.Name)
|
|
||||||
} else {
|
|
||||||
// Some other error
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// only update the fields we want to update.
|
|
||||||
existing.Description = cfg.Description
|
|
||||||
if err := db.Save(&existing).Error; err != nil {
|
|
||||||
log.Printf("Failed to update config %s: %v", cfg.Name, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Printf("Updated existing config: %s", cfg.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetAllConfigs(db *gorm.DB) ([]Config, error) {
|
|
||||||
var configs []Config
|
|
||||||
|
|
||||||
result := db.Find(&configs)
|
|
||||||
|
|
||||||
return configs, result.Error
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -12,7 +12,12 @@ var DB *gorm.DB
|
|||||||
|
|
||||||
type JSONB map[string]interface{}
|
type JSONB map[string]interface{}
|
||||||
|
|
||||||
func InitDB() error {
|
type DBConfig struct {
|
||||||
|
DB *gorm.DB
|
||||||
|
DSN string
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitDB() (*DBConfig, error) {
|
||||||
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s",
|
dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s",
|
||||||
os.Getenv("DB_HOST"),
|
os.Getenv("DB_HOST"),
|
||||||
os.Getenv("DB_PORT"),
|
os.Getenv("DB_PORT"),
|
||||||
@@ -24,7 +29,7 @@ func InitDB() error {
|
|||||||
|
|
||||||
DB, err = gorm.Open(postgres.Open(dsn), &gorm.Config{})
|
DB, err = gorm.Open(postgres.Open(dsn), &gorm.Config{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to connect to database: %v", err)
|
return nil, fmt.Errorf("failed to connect to database: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("✅ Connected to database")
|
fmt.Println("✅ Connected to database")
|
||||||
@@ -32,12 +37,15 @@ func InitDB() error {
|
|||||||
// ensures we have the uuid stuff setup properly
|
// ensures we have the uuid stuff setup properly
|
||||||
DB.Exec(`CREATE EXTENSION IF NOT EXISTS "uuid-ossp"`)
|
DB.Exec(`CREATE EXTENSION IF NOT EXISTS "uuid-ossp"`)
|
||||||
|
|
||||||
err = DB.AutoMigrate(&Log{}, &Config{}, &ClientRecord{})
|
err = DB.AutoMigrate(&Log{}, &Settings{}, &ClientRecord{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to auto-migrate models: %v", err)
|
return nil, fmt.Errorf("failed to auto-migrate models: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("✅ Database migration completed successfully")
|
fmt.Println("✅ Database migration completed successfully")
|
||||||
|
|
||||||
return nil
|
return &DBConfig{
|
||||||
|
DB: DB,
|
||||||
|
DSN: dsn,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
167
backend/utils/db/settings.go
Normal file
167
backend/utils/db/settings.go
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"reflect"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"lst.net/utils/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Settings struct {
|
||||||
|
SettingID uuid.UUID `gorm:"type:uuid;default:uuid_generate_v4();primaryKey" json:"id"`
|
||||||
|
Name string `gorm:"uniqueIndex;not null"`
|
||||||
|
Description string `gorm:"type:text"`
|
||||||
|
Value string `gorm:"not null"`
|
||||||
|
Enabled bool `gorm:"default:true"`
|
||||||
|
AppService string `gorm:"default:system"`
|
||||||
|
CreatedAt time.Time `gorm:"index"`
|
||||||
|
UpdatedAt time.Time `gorm:"index"`
|
||||||
|
DeletedAt gorm.DeletedAt `gorm:"index"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var seedConfigData = []Settings{
|
||||||
|
{Name: "serverPort", Description: "The port the server will listen on if not running in docker", Value: "4000", Enabled: true, AppService: "server"},
|
||||||
|
{Name: "server", Description: "The server we will use when connecting to the alplaprod sql", Value: "usmcd1vms006", Enabled: true, AppService: "server"},
|
||||||
|
{Name: "timezone", Value: "America/Chicago", Description: "What time zone is the server in this is used for cronjobs and some other time stuff", AppService: "server", Enabled: true},
|
||||||
|
{Name: "dbUser", Value: "alplaprod", Description: "What is the db userName", AppService: "server", Enabled: true},
|
||||||
|
{Name: "dbPass", Value: "b2JlbGl4", Description: "What is the db password", AppService: "server", Enabled: true},
|
||||||
|
{Name: "tcpPort", Value: "2222", Description: "TCP port for printers to connect send data and the zedra cameras", AppService: "server", Enabled: true},
|
||||||
|
{Name: "prolinkCheck", Value: "1", Description: "Will prolink be considered to check if matches, maninly used in plants that do not fully utilize prolink + ocp", AppService: "production", Enabled: true},
|
||||||
|
{Name: "bookin", Value: "1", Description: "do we want to book in after a label is printed", AppService: "ocp", Enabled: true},
|
||||||
|
{Name: "dbServer", Value: "usmcd1vms036", Description: "What server is the prod db on?", AppService: "server", Enabled: true},
|
||||||
|
{Name: "printDelay", Value: "90", Description: "How long in seconds between prints", AppService: "ocp", Enabled: true},
|
||||||
|
{Name: "plantToken", Value: "test3", Description: "What is the plant token", AppService: "server", Enabled: true},
|
||||||
|
{Name: "dualPrinting", Value: "0", Description: "Dose the plant have 2 machines that go to 1?", AppService: "ocp", Enabled: true},
|
||||||
|
{Name: "ocmeService", Value: "0", Description: "Is the ocme service enabled. this is gernerally only for Dayton.", AppService: "ocme", Enabled: true},
|
||||||
|
{Name: "fifoCheck", Value: "45", Description: "How far back do we want to check for fifo default 45, putting 0 will ignore.", AppService: "ocme", Enabled: true},
|
||||||
|
{Name: "dayCheck", Value: "3", Description: "how many days +/- to check for shipments in alplaprod", AppService: "ocme", Enabled: true},
|
||||||
|
{Name: "maxLotPerTruck", Value: "3", Description: "How mant lots can we have per truck?", AppService: "ocme", Enabled: true},
|
||||||
|
{Name: "monitorAddress", Value: "8", Description: "What address is monitored to be limited to the amount of lots that can be added to a truck.", AppService: "ocme", Enabled: true},
|
||||||
|
{Name: "ocmeCycleCount", Value: "1", Description: "Are we allowing ocme cycle counts?", AppService: "ocme", Enabled: true},
|
||||||
|
{Name: "devDir", Value: "", Description: "This is the dev dir and strictly only for updating the servers.", AppService: "server", Enabled: true},
|
||||||
|
{Name: "demandMGTActivated", Value: "0", Description: "Do we allow for new fake edi?", AppService: "logistics", Enabled: true},
|
||||||
|
{Name: "qualityRequest", Value: "0", Description: "quality request module?", AppService: "quality", Enabled: true},
|
||||||
|
{Name: "ocpLogsCheck", Value: "4", Description: "How long do we want to allow logs to show that have not been cleared?", AppService: "ocp", Enabled: true},
|
||||||
|
{Name: "inhouseDelivery", Value: "0", Description: "Are we doing auto inhouse delivery?", AppService: "ocp", Enabled: true},
|
||||||
|
// dyco settings
|
||||||
|
{Name: "dycoConnect", Value: "0", Description: "Are we running the dyco system?", AppService: "dycp", Enabled: true},
|
||||||
|
{Name: "dycoPrint", Value: "0", Description: "Are we using the dyco to get the labels or the rfid?", AppService: "dyco", Enabled: true},
|
||||||
|
{Name: "strapperCheck", Value: "1", Description: "Are we monitoring the strapper for faults?", AppService: "dyco", Enabled: true},
|
||||||
|
// ocp
|
||||||
|
{Name: "ocpActive", Value: `1`, Description: "Are we pritning on demand?", AppService: "ocp", Enabled: true},
|
||||||
|
{Name: "ocpCycleDelay", Value: `10`, Description: "How long between printer cycles do we want to monitor.", AppService: "ocp", Enabled: true},
|
||||||
|
{Name: "pNgAddress", Value: `139`, Description: "What is the address for p&g so we can make sure we have the correct fake edi forcast going in.", AppService: "logisitcs", Enabled: true},
|
||||||
|
{Name: "scannerID", Value: `500`, Description: "What scanner id will we be using for the app", AppService: "logistics", Enabled: true},
|
||||||
|
{Name: "scannerPort", Value: `50002`, Description: "What port instance will we be using?", AppService: "logistics", Enabled: true},
|
||||||
|
{Name: "stagingReturnLocations", Value: `30125,31523`, Description: "What are the staging location IDs we will use to select from. seperated by commas", AppService: "logistics", Enabled: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
func SeedConfigs(db *gorm.DB) error {
|
||||||
|
|
||||||
|
for _, cfg := range seedConfigData {
|
||||||
|
var existing Settings
|
||||||
|
// Try to find config by unique Name
|
||||||
|
result := db.Where("Name =?", cfg.Name).First(&existing)
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
if result.Error == gorm.ErrRecordNotFound {
|
||||||
|
// not here lets add it
|
||||||
|
if err := db.Create(&cfg).Error; err != nil {
|
||||||
|
log.Printf("Failed to seed config %s: %v", cfg.Name, err)
|
||||||
|
}
|
||||||
|
//log.Printf("Seeded new config: %s", cfg.Name)
|
||||||
|
} else {
|
||||||
|
// Some other error
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// only update the fields we want to update.
|
||||||
|
existing.Description = cfg.Description
|
||||||
|
if err := db.Save(&existing).Error; err != nil {
|
||||||
|
log.Printf("Failed to update config %s: %v", cfg.Name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
//log.Printf("Updated existing config: %s", cfg.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetAllConfigs(db *gorm.DB) ([]map[string]interface{}, error) {
|
||||||
|
var settings []Settings
|
||||||
|
result := db.Find(&settings)
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function to convert struct to map with lowercase keys
|
||||||
|
toLowercase := func(s Settings) map[string]interface{} {
|
||||||
|
t := reflect.TypeOf(s)
|
||||||
|
v := reflect.ValueOf(s)
|
||||||
|
|
||||||
|
data := make(map[string]interface{})
|
||||||
|
|
||||||
|
for i := 0; i < t.NumField(); i++ {
|
||||||
|
field := strings.ToLower(t.Field(i).Name)
|
||||||
|
data[field] = v.Field(i).Interface()
|
||||||
|
}
|
||||||
|
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert each struct in settings slice to a map with lowercase keys
|
||||||
|
var lowercaseSettings []map[string]interface{}
|
||||||
|
for _, setting := range settings {
|
||||||
|
lowercaseSettings = append(lowercaseSettings, toLowercase(setting))
|
||||||
|
}
|
||||||
|
|
||||||
|
return lowercaseSettings, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpdateConfig(db *gorm.DB, id string, input inputs.SettingUpdateInput) error {
|
||||||
|
var cfg Settings
|
||||||
|
if err := db.Where("setting_id =?", id).First(&cfg).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
updates := map[string]interface{}{}
|
||||||
|
|
||||||
|
if input.Description != nil {
|
||||||
|
updates["description"] = *input.Description
|
||||||
|
}
|
||||||
|
if input.Value != nil {
|
||||||
|
updates["value"] = *input.Value
|
||||||
|
}
|
||||||
|
if input.Enabled != nil {
|
||||||
|
updates["enabled"] = *input.Enabled
|
||||||
|
}
|
||||||
|
if input.AppService != nil {
|
||||||
|
updates["app_service"] = *input.AppService
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(updates) == 0 {
|
||||||
|
return nil // nothing to update
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.Model(&cfg).Updates(updates).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
func DeleteConfig(db *gorm.DB, id uint) error {
|
||||||
|
// Soft delete by ID
|
||||||
|
return db.Delete(&Settings{}, id).Error
|
||||||
|
}
|
||||||
|
|
||||||
|
func RestoreConfig(db *gorm.DB, id uint) error {
|
||||||
|
var cfg Settings
|
||||||
|
if err := db.Unscoped().First(&cfg, id).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cfg.DeletedAt = gorm.DeletedAt{}
|
||||||
|
return db.Unscoped().Save(&cfg).Error
|
||||||
|
}
|
||||||
@@ -12,7 +12,7 @@ type ClientRecord struct {
|
|||||||
IPAddress string `gorm:"not null"`
|
IPAddress string `gorm:"not null"`
|
||||||
UserAgent string `gorm:"size:255"`
|
UserAgent string `gorm:"size:255"`
|
||||||
ConnectedAt time.Time `gorm:"index"`
|
ConnectedAt time.Time `gorm:"index"`
|
||||||
LastHeartbeat time.Time `gorm:"index"`
|
LastHeartbeat time.Time `gorm:"column:last_heartbeat"`
|
||||||
Channels JSONB `gorm:"type:jsonb"`
|
Channels JSONB `gorm:"type:jsonb"`
|
||||||
CreatedAt time.Time
|
CreatedAt time.Time
|
||||||
UpdatedAt time.Time
|
UpdatedAt time.Time
|
||||||
|
|||||||
8
backend/utils/inputs/settingsInput.go
Normal file
8
backend/utils/inputs/settingsInput.go
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
package inputs
|
||||||
|
|
||||||
|
type SettingUpdateInput struct {
|
||||||
|
Description *string `json:"description"`
|
||||||
|
Value *string `json:"value"`
|
||||||
|
Enabled *bool `json:"enabled"`
|
||||||
|
AppService *string `json:"app_service"`
|
||||||
|
}
|
||||||
@@ -18,7 +18,8 @@ type CustomLogger struct {
|
|||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
Channel string `json:"channel"`
|
Channel string `json:"channel"`
|
||||||
Data interface{} `json:"data"`
|
Data map[string]interface{} `json:"data"`
|
||||||
|
Meta map[string]interface{} `json:"meta,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a configured CustomLogger.
|
// New creates a configured CustomLogger.
|
||||||
|
|||||||
4
package-lock.json
generated
4
package-lock.json
generated
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "logistics_support_tool",
|
"name": "logistics_support_tool",
|
||||||
"version": "0.0.1-alpha.5",
|
"version": "0.0.1-alpha.6",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "logistics_support_tool",
|
"name": "logistics_support_tool",
|
||||||
"version": "0.0.1-alpha.5",
|
"version": "0.0.1-alpha.6",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"dotenv": "^17.2.0",
|
"dotenv": "^17.2.0",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "logistics_support_tool",
|
"name": "logistics_support_tool",
|
||||||
"version": "0.0.1-alpha.5",
|
"version": "0.0.1-alpha.6",
|
||||||
"description": "This is the new logisitcs support tool",
|
"description": "This is the new logisitcs support tool",
|
||||||
"private": true,
|
"private": true,
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
|
|||||||
Reference in New Issue
Block a user