diff --git a/app/utils/ws.ts b/app/utils/ws.ts new file mode 100644 index 0000000..cf81327 --- /dev/null +++ b/app/utils/ws.ts @@ -0,0 +1,47 @@ +// This is a "side-effect" but we want a lifecycle cache map of +// peer statuses to prevent unnecessary fetches to the agent. +import type { LoaderFunctionArgs } from 'remix' + +type Context = LoaderFunctionArgs['context'] +const cache: { [nodeID: string]: unknown } = {} + +export async function queryWS(context: Context, nodeIDs: string[]) { + const ws = context.ws + const firstClient = ws.clients.values().next().value + if (!firstClient) { + return cache + } + + const cached = nodeIDs.map((nodeID) => { + const cached = cache[nodeID] + if (cached) { + return cached + } + }) + + // We only need to query the nodes that are not cached + const uncached = nodeIDs.filter((nodeID) => !cached.includes(nodeID)) + if (uncached.length === 0) { + return cache + } + + firstClient.send(JSON.stringify({ NodeIDs: uncached })) + await new Promise((resolve) => { + const timeout = setTimeout(() => { + resolve() + }, 3000) + + firstClient.on('message', (message) => { + const data = JSON.parse(message.toString()) + if (Object.keys(data).length === 0) { + resolve() + } + + for (const [nodeID, status] of Object.entries(data)) { + cache[nodeID] = status + } + }) + }) + + return cache +} diff --git a/package.json b/package.json index 2a29c0f..dc2abb2 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "tailwindcss-react-aria-components": "^1.2.0", "undici": "^7.1.0", "usehooks-ts": "^3.1.0", + "ws": "^8.18.0", "yaml": "^2.6.1", "zod": "^3.23.8" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3bf25e1..db7b538 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -103,6 +103,9 @@ importers: usehooks-ts: specifier: ^3.1.0 version: 3.1.0(react@19.0.0) + ws: + specifier: ^8.18.0 + version: 8.18.0 yaml: specifier: ^2.6.1 version: 2.6.1 diff --git a/server/prod.mjs b/server/prod.mjs index f5fd9f6..55bdf8a 100644 --- a/server/prod.mjs +++ b/server/prod.mjs @@ -10,6 +10,7 @@ import { createServer } from 'node:http' import { join, resolve } from 'node:path' import { env } from 'node:process' import { log } from './utils.mjs' +import { getWss, registerWss } from './ws.mjs' log('SRVX', 'INFO', `Running with Node.js ${process.versions.node}`) @@ -142,7 +143,9 @@ const http = createServer(async (req, res) => { }) // Pass our request to the Remix handler and get a response - const response = await handler(remixReq, {}) // No context + const response = await handler(remixReq, { + ws: getWss() + }) // Handle our response and reply res.statusCode = response.status @@ -160,6 +163,7 @@ const http = createServer(async (req, res) => { res.end() }) +registerWss(http) http.listen(port, host, () => { log('SRVX', 'INFO', `Running on ${host}:${port}`) }) diff --git a/server/ws.mjs b/server/ws.mjs new file mode 100644 index 0000000..e15820f --- /dev/null +++ b/server/ws.mjs @@ -0,0 +1,28 @@ +// The Websocket server is wholly responsible for ingesting messages from +// Headplane agent instances (hopefully not more than 1 is running lol) +import { WebSocketServer } from 'ws' +import { log } from './utils.mjs' + +const wss = new WebSocketServer({ noServer: true }) +wss.on('connection', (ws, req) => { + // On connection the agent will send its NodeID via Headers + // We store this for later use to validate and show on the UI + const nodeID = req.headers['x-headplane-ts-node-id'] + if (!nodeID) { + ws.close(1008, 'ERR_NO_HP_TS_NODE_ID') + return + } +}) + +export async function registerWss(server) { + log('SRVX', 'INFO', 'Registering Websocket Server') + server.on('upgrade', (request, socket, head) => { + wss.handleUpgrade(request, socket, head, ws => { + wss.emit('connection', ws, request) + }) + }) +} + +export function getWss() { + return wss +}