feat: add ws server for agent

This commit is contained in:
Aarnav Tale 2024-12-30 13:48:10 +05:30
parent 1c2c374ada
commit 6156d78907
No known key found for this signature in database
5 changed files with 84 additions and 1 deletions

47
app/utils/ws.ts Normal file
View File

@ -0,0 +1,47 @@
// This is a "side-effect" but we want a lifecycle cache map of
// peer statuses to prevent unnecessary fetches to the agent.
import type { LoaderFunctionArgs } from 'remix'
type Context = LoaderFunctionArgs['context']
const cache: { [nodeID: string]: unknown } = {}
export async function queryWS(context: Context, nodeIDs: string[]) {
const ws = context.ws
const firstClient = ws.clients.values().next().value
if (!firstClient) {
return cache
}
const cached = nodeIDs.map((nodeID) => {
const cached = cache[nodeID]
if (cached) {
return cached
}
})
// We only need to query the nodes that are not cached
const uncached = nodeIDs.filter((nodeID) => !cached.includes(nodeID))
if (uncached.length === 0) {
return cache
}
firstClient.send(JSON.stringify({ NodeIDs: uncached }))
await new Promise((resolve) => {
const timeout = setTimeout(() => {
resolve()
}, 3000)
firstClient.on('message', (message) => {
const data = JSON.parse(message.toString())
if (Object.keys(data).length === 0) {
resolve()
}
for (const [nodeID, status] of Object.entries(data)) {
cache[nodeID] = status
}
})
})
return cache
}

View File

@ -40,6 +40,7 @@
"tailwindcss-react-aria-components": "^1.2.0", "tailwindcss-react-aria-components": "^1.2.0",
"undici": "^7.1.0", "undici": "^7.1.0",
"usehooks-ts": "^3.1.0", "usehooks-ts": "^3.1.0",
"ws": "^8.18.0",
"yaml": "^2.6.1", "yaml": "^2.6.1",
"zod": "^3.23.8" "zod": "^3.23.8"
}, },

View File

@ -103,6 +103,9 @@ importers:
usehooks-ts: usehooks-ts:
specifier: ^3.1.0 specifier: ^3.1.0
version: 3.1.0(react@19.0.0) version: 3.1.0(react@19.0.0)
ws:
specifier: ^8.18.0
version: 8.18.0
yaml: yaml:
specifier: ^2.6.1 specifier: ^2.6.1
version: 2.6.1 version: 2.6.1

View File

@ -10,6 +10,7 @@ import { createServer } from 'node:http'
import { join, resolve } from 'node:path' import { join, resolve } from 'node:path'
import { env } from 'node:process' import { env } from 'node:process'
import { log } from './utils.mjs' import { log } from './utils.mjs'
import { getWss, registerWss } from './ws.mjs'
log('SRVX', 'INFO', `Running with Node.js ${process.versions.node}`) log('SRVX', 'INFO', `Running with Node.js ${process.versions.node}`)
@ -142,7 +143,9 @@ const http = createServer(async (req, res) => {
}) })
// Pass our request to the Remix handler and get a response // Pass our request to the Remix handler and get a response
const response = await handler(remixReq, {}) // No context const response = await handler(remixReq, {
ws: getWss()
})
// Handle our response and reply // Handle our response and reply
res.statusCode = response.status res.statusCode = response.status
@ -160,6 +163,7 @@ const http = createServer(async (req, res) => {
res.end() res.end()
}) })
registerWss(http)
http.listen(port, host, () => { http.listen(port, host, () => {
log('SRVX', 'INFO', `Running on ${host}:${port}`) log('SRVX', 'INFO', `Running on ${host}:${port}`)
}) })

28
server/ws.mjs Normal file
View File

@ -0,0 +1,28 @@
// The Websocket server is wholly responsible for ingesting messages from
// Headplane agent instances (hopefully not more than 1 is running lol)
import { WebSocketServer } from 'ws'
import { log } from './utils.mjs'
const wss = new WebSocketServer({ noServer: true })
wss.on('connection', (ws, req) => {
// On connection the agent will send its NodeID via Headers
// We store this for later use to validate and show on the UI
const nodeID = req.headers['x-headplane-ts-node-id']
if (!nodeID) {
ws.close(1008, 'ERR_NO_HP_TS_NODE_ID')
return
}
})
export async function registerWss(server) {
log('SRVX', 'INFO', 'Registering Websocket Server')
server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket, head, ws => {
wss.emit('connection', ws, request)
})
})
}
export function getWss() {
return wss
}