ws commentary

This commit is contained in:
2026-02-07 21:44:19 -06:00
parent 13e917183f
commit c10556d306
4 changed files with 115 additions and 25 deletions

View File

@@ -26,8 +26,10 @@ app.use(securityMiddleware());
app.use("/matches", matchRouter);
app.use("/matches/:id/commentary", comRouter);
const { broadcastMatchCreated } = attachWebsocketServer(server);
const { broadcastMatchCreated, broadcastCommentary } =
attachWebsocketServer(server);
app.locals.broadcastMatchCreated = broadcastMatchCreated;
app.locals.broadcastCommentary = broadcastCommentary;
server.listen(PORT, HOST, () => {
const baseURL =

View File

@@ -69,7 +69,7 @@ comRouter.post("/", async (req, res) => {
try {
const { minutes, ...rest } = bodyResult.data;
const [result] = await db
const [entry] = await db
.insert(commentary)
.values({
matchId: paramsResult.data.id,
@@ -78,7 +78,10 @@ comRouter.post("/", async (req, res) => {
})
.returning();
res.status(201).json({ data: result });
if (res.app.locals.broadcastCommentary) {
res.app.locals.broadcastCommentary(entry.matchId, entry);
}
res.status(201).json({ data: entry });
} catch (e) {
console.error("Failed to create commentary:", e);

View File

@@ -1,5 +1,33 @@
import { WebSocket, WebSocketServer } from "ws";
import { wsArkjet } from "../utils/arkjet.js";
//import { wsArkjet } from "../utils/arkjet.js";
const matchSubscribers = new Map();
const subscribe = (matchId, socket) => {
if (!matchSubscribers.has(matchId)) {
matchSubscribers.set(matchId, new Set());
}
matchSubscribers.get(matchId).add(socket);
};
const unsubscribe = (matchId, socket) => {
const subscribers = matchSubscribers.get(matchId);
if (!subscribers) return;
subscribe.delete(matchId);
if (subscribers.size === 0) {
matchSubscribers.delete(socket);
}
};
const cleanupSubscriptions = (socket) => {
for (const matchId of socket.subscriptions) {
unsubscribe(matchId, socket);
}
};
const sendJson = (socket, payload) => {
if (socket.readyState !== WebSocket.OPEN) {
@@ -9,7 +37,7 @@ const sendJson = (socket, payload) => {
socket.send(JSON.stringify(payload));
};
const broadcast = (wss, payload) => {
const broadcastToAll = (wss, payload) => {
for (const client of wss.clients) {
if (client.readyState !== WebSocket.OPEN) continue;
@@ -17,6 +45,44 @@ const broadcast = (wss, payload) => {
}
};
const broadcastToMatch = (matchId, payload) => {
const subscribers = matchSubscribers.get(matchId);
if (!subscribers || subscribers.size === 0) return;
const message = JSON.stringify(payload);
for (const client of subscribers) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
}
};
const handleMessage = (socket, data) => {
let message;
try {
message = JSON.parse(data.toString());
} catch {
sendJson(socket, { type: "error", message: "Invalid JSON" });
}
if (message?.type === "subscribe" && Number.isInteger(message.matchId)) {
subscribe(message.matchId, socket);
socket.subscriptions.add(message.matchId);
sendJson(socket, { type: "subscribed", matchId: message.matchId });
}
if (message?.type === "unsubscribe" && Number.isInteger(message.matchId)) {
unsubscribe(message.matchId, socket);
socket.subscriptions.delete(message.matchId);
sendJson(socket, { type: "unsubscribe", matchId: message.matchId });
}
};
export const attachWebsocketServer = (server) => {
const wss = new WebSocketServer({
server,
@@ -24,33 +90,47 @@ export const attachWebsocketServer = (server) => {
maxPayload: 1024 * 1024, // 1mb
});
wss.on("connection", async (socket, req) => {
if (wsArkjet) {
try {
const desision = await wsArkjet.protect(req);
wss.on("connection", async (socket, _) => {
// if (wsArkjet) {
// try {
// const desision = await wsArkjet.protect(req);
if (desision.isDenied) {
const code = desision.reason.isRateLimit() ? 1013 : 1008;
const reason = desision.reason.isRateLimit()
? "Rate limit exceedeed"
: "Access denied";
// if (desision.isDenied) {
// const code = desision.reason.isRateLimit() ? 1013 : 1008;
// const reason = desision.reason.isRateLimit()
// ? "Rate limit exceedeed"
// : "Access denied";
socket.close(code, reason);
return;
}
} catch (e) {
console.error("WS connection error", e);
socket.close(1011, "Server security error");
return;
}
}
// socket.close(code, reason);
// return;
// }
// } catch (e) {
// console.error("WS connection error", e);
// socket.close(1011, "Server security error");
// return;
// }
// }
socket.isAlive = true;
socket.on("pong", () => {
socket.isAlive = true;
});
socket.subscriptions = new Set(); // allows to create a who is here
sendJson(socket, { type: "welcome" });
socket.on("message", (data) => {
handleMessage(socket, data);
});
socket.on("error", () => {
socket.terminate();
});
socket.on("error", console.error);
socket.on("close", () => {
cleanupSubscriptions(socket);
});
});
const interval = setInterval(() => {
@@ -67,8 +147,12 @@ export const attachWebsocketServer = (server) => {
wss.on("close", () => clearInterval(interval));
function broadcastMatchCreated(match) {
broadcast(wss, { type: "match_created", data: match });
broadcastToAll(wss, { type: "match_created", data: match });
}
return { broadcastMatchCreated };
function broadcastCommentary(matchId, comment) {
broadcastToMatch(matchId, { type: "commentary", data: comment });
}
return { broadcastMatchCreated, broadcastCommentary };
};

1
testScripts/scripts.txt Normal file
View File

@@ -0,0 +1 @@
wscat -c ws://localhost:8082/ws