From c10556d306fdc485a4eb96fd30ae10b33bc331c1 Mon Sep 17 00:00:00 2001 From: blake Date: Sat, 7 Feb 2026 21:44:19 -0600 Subject: [PATCH] ws commentary --- src/index.js | 4 +- src/routes/commentary.route.js | 7 +- src/ws/server.js | 128 +++++++++++++++++++++++++++------ testScripts/scripts.txt | 1 + 4 files changed, 115 insertions(+), 25 deletions(-) create mode 100644 testScripts/scripts.txt diff --git a/src/index.js b/src/index.js index 00ae009..7c5e921 100644 --- a/src/index.js +++ b/src/index.js @@ -26,8 +26,10 @@ app.use(securityMiddleware()); app.use("/matches", matchRouter); app.use("/matches/:id/commentary", comRouter); -const { broadcastMatchCreated } = attachWebsocketServer(server); +const { broadcastMatchCreated, broadcastCommentary } = + attachWebsocketServer(server); app.locals.broadcastMatchCreated = broadcastMatchCreated; +app.locals.broadcastCommentary = broadcastCommentary; server.listen(PORT, HOST, () => { const baseURL = diff --git a/src/routes/commentary.route.js b/src/routes/commentary.route.js index 7b1c645..c0fc17f 100644 --- a/src/routes/commentary.route.js +++ b/src/routes/commentary.route.js @@ -69,7 +69,7 @@ comRouter.post("/", async (req, res) => { try { const { minutes, ...rest } = bodyResult.data; - const [result] = await db + const [entry] = await db .insert(commentary) .values({ matchId: paramsResult.data.id, @@ -78,7 +78,10 @@ comRouter.post("/", async (req, res) => { }) .returning(); - res.status(201).json({ data: result }); + if (res.app.locals.broadcastCommentary) { + res.app.locals.broadcastCommentary(entry.matchId, entry); + } + res.status(201).json({ data: entry }); } catch (e) { console.error("Failed to create commentary:", e); diff --git a/src/ws/server.js b/src/ws/server.js index 4440f81..42a9207 100644 --- a/src/ws/server.js +++ b/src/ws/server.js @@ -1,5 +1,33 @@ import { WebSocket, WebSocketServer } from "ws"; -import { wsArkjet } from "../utils/arkjet.js"; +//import { wsArkjet } from "../utils/arkjet.js"; + +const matchSubscribers = new Map(); + +const subscribe = (matchId, socket) => { + if (!matchSubscribers.has(matchId)) { + matchSubscribers.set(matchId, new Set()); + } + + matchSubscribers.get(matchId).add(socket); +}; + +const unsubscribe = (matchId, socket) => { + const subscribers = matchSubscribers.get(matchId); + + if (!subscribers) return; + + subscribe.delete(matchId); + + if (subscribers.size === 0) { + matchSubscribers.delete(socket); + } +}; + +const cleanupSubscriptions = (socket) => { + for (const matchId of socket.subscriptions) { + unsubscribe(matchId, socket); + } +}; const sendJson = (socket, payload) => { if (socket.readyState !== WebSocket.OPEN) { @@ -9,7 +37,7 @@ const sendJson = (socket, payload) => { socket.send(JSON.stringify(payload)); }; -const broadcast = (wss, payload) => { +const broadcastToAll = (wss, payload) => { for (const client of wss.clients) { if (client.readyState !== WebSocket.OPEN) continue; @@ -17,6 +45,44 @@ const broadcast = (wss, payload) => { } }; +const broadcastToMatch = (matchId, payload) => { + const subscribers = matchSubscribers.get(matchId); + + if (!subscribers || subscribers.size === 0) return; + + const message = JSON.stringify(payload); + + for (const client of subscribers) { + if (client.readyState === WebSocket.OPEN) { + client.send(message); + } + } +}; + +const handleMessage = (socket, data) => { + let message; + + try { + message = JSON.parse(data.toString()); + } catch { + sendJson(socket, { type: "error", message: "Invalid JSON" }); + } + + if (message?.type === "subscribe" && Number.isInteger(message.matchId)) { + subscribe(message.matchId, socket); + + socket.subscriptions.add(message.matchId); + sendJson(socket, { type: "subscribed", matchId: message.matchId }); + } + + if (message?.type === "unsubscribe" && Number.isInteger(message.matchId)) { + unsubscribe(message.matchId, socket); + + socket.subscriptions.delete(message.matchId); + + sendJson(socket, { type: "unsubscribe", matchId: message.matchId }); + } +}; export const attachWebsocketServer = (server) => { const wss = new WebSocketServer({ server, @@ -24,33 +90,47 @@ export const attachWebsocketServer = (server) => { maxPayload: 1024 * 1024, // 1mb }); - wss.on("connection", async (socket, req) => { - if (wsArkjet) { - try { - const desision = await wsArkjet.protect(req); + wss.on("connection", async (socket, _) => { + // if (wsArkjet) { + // try { + // const desision = await wsArkjet.protect(req); - if (desision.isDenied) { - const code = desision.reason.isRateLimit() ? 1013 : 1008; - const reason = desision.reason.isRateLimit() - ? "Rate limit exceedeed" - : "Access denied"; + // if (desision.isDenied) { + // const code = desision.reason.isRateLimit() ? 1013 : 1008; + // const reason = desision.reason.isRateLimit() + // ? "Rate limit exceedeed" + // : "Access denied"; - socket.close(code, reason); - return; - } - } catch (e) { - console.error("WS connection error", e); - socket.close(1011, "Server security error"); - return; - } - } + // socket.close(code, reason); + // return; + // } + // } catch (e) { + // console.error("WS connection error", e); + // socket.close(1011, "Server security error"); + // return; + // } + // } socket.isAlive = true; socket.on("pong", () => { socket.isAlive = true; }); + + socket.subscriptions = new Set(); // allows to create a who is here sendJson(socket, { type: "welcome" }); + socket.on("message", (data) => { + handleMessage(socket, data); + }); + + socket.on("error", () => { + socket.terminate(); + }); + socket.on("error", console.error); + + socket.on("close", () => { + cleanupSubscriptions(socket); + }); }); const interval = setInterval(() => { @@ -67,8 +147,12 @@ export const attachWebsocketServer = (server) => { wss.on("close", () => clearInterval(interval)); function broadcastMatchCreated(match) { - broadcast(wss, { type: "match_created", data: match }); + broadcastToAll(wss, { type: "match_created", data: match }); } - return { broadcastMatchCreated }; + function broadcastCommentary(matchId, comment) { + broadcastToMatch(matchId, { type: "commentary", data: comment }); + } + + return { broadcastMatchCreated, broadcastCommentary }; }; diff --git a/testScripts/scripts.txt b/testScripts/scripts.txt new file mode 100644 index 0000000..abdd733 --- /dev/null +++ b/testScripts/scripts.txt @@ -0,0 +1 @@ +wscat -c ws://localhost:8082/ws \ No newline at end of file