From 0aa0406ea6a4c091494774382cc0e0f29206cb53 Mon Sep 17 00:00:00 2001 From: Aarnav Tale Date: Tue, 9 Jul 2024 22:53:00 -0400 Subject: [PATCH] feat(TALE-7): reimplement integration system --- app/integration/docker.ts | 218 +++++++++++++----------- app/integration/index.ts | 58 +++---- app/integration/integration.ts | 14 ++ app/integration/kubernetes.ts | 301 +++++++++++++++------------------ app/integration/proc.ts | 134 +++++++-------- app/utils/config/headplane.ts | 6 +- 6 files changed, 356 insertions(+), 375 deletions(-) create mode 100644 app/integration/integration.ts diff --git a/app/integration/docker.ts b/app/integration/docker.ts index 8630d7d..d2e8d68 100644 --- a/app/integration/docker.ts +++ b/app/integration/docker.ts @@ -5,123 +5,143 @@ import { Client } from 'undici' import { HeadscaleError, pull } from '~/utils/headscale' -import type { Integration } from '.' +import { createIntegration } from './integration' -// Integration name -const name = 'Docker' +interface Context { + client: Client | undefined + container: string | undefined + maxAttempts: number +} -let url: URL | undefined -let container: string | undefined +export default createIntegration({ + name: 'Docker', + context: { + client: undefined, + container: undefined, + maxAttempts: 10, + }, + isAvailable: async ({ client, container }) => { + // Check for the HEADSCALE_CONTAINER environment variable first + // to avoid unnecessary fetching of the Docker socket + container = process.env.HEADSCALE_CONTAINER + ?.trim() + .toLowerCase() -async function preflight() { - const path = process.env.DOCKER_SOCK ?? 'unix:///var/run/docker.sock' + if (!container || container.length === 0) { + return false + } - try { - url = new URL(path) - } catch { - return false - } + const path = process.env.DOCKER_SOCK ?? 'unix:///var/run/docker.sock' + let url: URL | undefined - // The API is available as an HTTP endpoint - if (url.protocol === 'tcp:') { - url.protocol = 'http:' - } - - // Check if the socket is accessible - if (url.protocol === 'unix:') { try { - await access(path, constants.R_OK) + url = new URL(path) } catch { return false } - } - if (url.protocol === 'http:') { - try { - await fetch(new URL('/v1.30/version', url).href) - } catch { + if (url.protocol !== 'tcp:' && url.protocol !== 'unix:') { return false } - } - if (url.protocol !== 'http:' && url.protocol !== 'unix:') { - return false - } + // The API is available as an HTTP endpoint and this + // will simplify the fetching logic in undici + if (url.protocol === 'tcp:') { + url.protocol = 'http:' + try { + await fetch(new URL('/v1.30/version', url).href) + } catch { + return false + } - container = process.env.HEADSCALE_CONTAINER - ?.trim() - .toLowerCase() + client = new Client(url.href) + } - if (!container || container.length === 0) { - return false - } + // Check if the socket is accessible + if (url.protocol === 'unix:') { + try { + await access(path, constants.R_OK) + } catch { + return false + } - return true -} + client = new Client('http://localhost', { + socketPath: path, + }) + } -async function sighup() { - if (!url || !container) { - return - } + return client === undefined + }, - // Supports the DOCKER_SOCK environment variable - const client = url.protocol === 'unix:' - ? new Client('http://localhost', { - socketPath: url.href, - }) - : new Client(url.href) - - const response = await client.request({ - method: 'POST', - path: `/v1.30/containers/${container}/kill?signal=SIGHUP`, - }) - - if (!response.statusCode || response.statusCode !== 204) { - throw new Error('Failed to send SIGHUP to Headscale') - } -} - -async function restart() { - if (!url || !container) { - return - } - - // Supports the DOCKER_SOCK environment variable - const client = url.protocol === 'unix:' - ? new Client('http://localhost', { - socketPath: url.href, - }) - : new Client(url.href) - - const response = await client.request({ - method: 'POST', - path: `/v1.30/containers/${container}/restart`, - }) - - if (!response.statusCode || response.statusCode !== 204) { - throw new Error('Failed to restart Headscale') - } - - // Wait for Headscale to restart before continuing - let attempts = 0 - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition, no-constant-condition - while (true) { - try { - await pull('v1', '') + onAclChange: async ({ client, container, maxAttempts }) => { + if (!client || !container) { return - } catch (error) { - if (error instanceof HeadscaleError && error.status === 401) { - break - } - - if (attempts > 10) { - throw new Error('Headscale did not restart in time') - } - - attempts++ - await setTimeout(1000) } - } -} -export default { name, preflight, sighup, restart } satisfies Integration + let attempts = 0 + while (attempts <= maxAttempts) { + const response = await client.request({ + method: 'POST', + path: `/v1.30/containers/${container}/kill?signal=SIGHUP`, + }) + + if (response.statusCode !== 204) { + if (attempts < maxAttempts) { + attempts++ + await setTimeout(1000) + continue + } + + const stringCode = response.statusCode.toString() + const body = await response.body.text() + throw new Error(`API request failed: ${stringCode} ${body}`) + } + } + }, + + onConfigChange: async ({ client, container, maxAttempts }) => { + if (!client || !container) { + return + } + + let attempts = 0 + while (attempts <= maxAttempts) { + const response = await client.request({ + method: 'POST', + path: `/v1.30/containers/${container}/restart`, + }) + + if (response.statusCode !== 204) { + if (attempts < maxAttempts) { + attempts++ + await setTimeout(1000) + continue + } + + const stringCode = response.statusCode.toString() + const body = await response.body.text() + throw new Error(`API request failed: ${stringCode} ${body}`) + } + } + + attempts = 0 + while (attempts <= maxAttempts) { + try { + await pull('v1', '') + return + } catch (error) { + if (error instanceof HeadscaleError && error.status === 401) { + break + } + + if (attempts < maxAttempts) { + attempts++ + await setTimeout(1000) + continue + } + + throw new Error(`Missed restart deadline for ${container}`) + } + } + }, +}) diff --git a/app/integration/index.ts b/app/integration/index.ts index 42c395e..801eee0 100644 --- a/app/integration/index.ts +++ b/app/integration/index.ts @@ -1,28 +1,18 @@ -import docker from './docker' -import kubernetes from './kubernetes' -import proc from './proc' +import dockerIntegration from './docker' +import kubernetesIntegration from './kubernetes' +import procIntegration from './proc' -export interface Integration { - name: string - preflight: () => Promise - sighup?: () => Promise - restart?: () => Promise -} +export * from './integration' -// Because we previously supported the Docker integration by -// checking for the HEADSCALE_CONTAINER variable, we need to -// check for it here as well. -// -// This ensures that when people upgrade from older versions -// of Headplane, they don't explicitly need to define the new -// HEADSCALE_INTEGRATION variable that is needed to configure -// an integration. -export async function checkIntegration() { +export function loadIntegration() { let integration = process.env.HEADSCALE_INTEGRATION ?.trim() .toLowerCase() // Old HEADSCALE_CONTAINER variable upgrade path + // This ensures that when people upgrade from older versions of Headplane + // they don't explicitly need to define the new HEADSCALE_INTEGRATION + // variable that is needed to configure docker if (!integration && process.env.HEADSCALE_CONTAINER) { integration = 'docker' } @@ -32,32 +22,22 @@ export async function checkIntegration() { return } - let module: Integration | undefined - try { - module = getIntegration(integration) - await module.preflight() - } catch (error) { - console.error('Failed to load integration', error) - return - } - - return module -} - -function getIntegration(name: string) { - switch (name) { + switch (integration.toLowerCase().trim()) { case 'docker': { - return docker + return dockerIntegration } - case 'proc': { - return proc + + case 'proc': + case 'native': + case 'linux': { + return procIntegration } + case 'kubernetes': case 'k8s': { - return kubernetes - } - default: { - throw new Error(`Unknown integration: ${name}`) + return kubernetesIntegration } } + + console.error('Unknown integration:', integration) } diff --git a/app/integration/integration.ts b/app/integration/integration.ts new file mode 100644 index 0000000..c1e1056 --- /dev/null +++ b/app/integration/integration.ts @@ -0,0 +1,14 @@ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export interface IntegrationFactory { + name: string + context: T + isAvailable: (context: T) => Promise | boolean + onAclChange?: (context: T) => Promise | void + onConfigChange?: (context: T) => Promise | void +} + +export function createIntegration( + options: IntegrationFactory, +) { + return options +} diff --git a/app/integration/kubernetes.ts b/app/integration/kubernetes.ts index 84ddb00..904c4b3 100644 --- a/app/integration/kubernetes.ts +++ b/app/integration/kubernetes.ts @@ -1,189 +1,162 @@ -import { access, constants, readdir, readFile } from 'node:fs/promises' +import { readdir, readFile } from 'node:fs/promises' import { platform } from 'node:os' import { join, resolve } from 'node:path' import { kill } from 'node:process' import { Config, CoreV1Api, KubeConfig } from '@kubernetes/client-node' -import type { Integration } from '.' +import { createIntegration } from './integration' -// Integration name -const name = 'Kubernetes (k8s)' - -// Check if we have a proper service account and /proc -// This is because the Kubernetes integration is basically -// the /proc integration plus some extra steps. -async function preflight() { - if (platform() !== 'linux') { - console.error('Not running on k8s Linux') - return false - } - - const dir = resolve('/proc') - try { - await access(dir, constants.R_OK) - } catch (error) { - console.error('Failed to access /proc', error) - return false - } - - const secretsDir = resolve(Config.SERVICEACCOUNT_ROOT) - try { - const files = await readdir(secretsDir) - if (files.length === 0) { - console.error('No Kubernetes service account found') - return false - } - - const mappedFiles = new Set(files.map(file => join(secretsDir, file))) - const expectedFiles = [ - Config.SERVICEACCOUNT_CA_PATH, - Config.SERVICEACCOUNT_TOKEN_PATH, - Config.SERVICEACCOUNT_NAMESPACE_PATH, - ] - - if (!expectedFiles.every(file => mappedFiles.has(file))) { - console.error('Kubernetes service account is incomplete') - return false - } - } catch (error) { - console.error('Failed to access Kubernetes service account', error) - return false - } - - const namespace = await readFile(Config.SERVICEACCOUNT_NAMESPACE_PATH, 'utf8') - if (namespace.trim().length === 0) { - console.error('Kubernetes namespace is empty') - return false - } - - const skip = process.env.HEADSCALE_INTEGRATION_UNSTRICT - if (skip === 'true' || skip === '1') { - console.warn('Skipping strict Kubernetes integration check') - return true - } - - // Some very ugly nesting but it's necessary - const pod = process.env.POD_NAME - if (!pod) { - console.error('No pod name found (POD_NAME)') - return false - } - - const result = await checkPod(pod, namespace) - if (!result) { - return false - } - - return true +interface Context { + pid: number | undefined } -async function checkPod(pod: string, namespace: string) { - if (pod.trim().length === 0) { - console.error('Pod name is empty') - return false - } +export default createIntegration({ + name: 'Kubernetes (k8s)', + context: { + pid: undefined, + }, + isAvailable: async ({ pid }) => { + if (platform() !== 'linux') { + return false + } - try { - const kc = new KubeConfig() - kc.loadFromCluster() + const svcRoot = Config.SERVICEACCOUNT_ROOT + try { + const files = await readdir(svcRoot) + if (files.length === 0) { + console.error('No Kubernetes service account found') + return false + } - const kCoreV1Api = kc.makeApiClient(CoreV1Api) - const { response, body } = await kCoreV1Api.readNamespacedPod( - pod, - namespace, + const mappedFiles = new Set(files.map(file => join(svcRoot, file))) + const expectedFiles = [ + Config.SERVICEACCOUNT_CA_PATH, + Config.SERVICEACCOUNT_TOKEN_PATH, + Config.SERVICEACCOUNT_NAMESPACE_PATH, + ] + + if (!expectedFiles.every(file => mappedFiles.has(file))) { + console.error('Kubernetes service account is incomplete') + return false + } + } catch (error) { + console.error('Failed to access Kubernetes service account', error) + return false + } + + const namespace = await readFile( + Config.SERVICEACCOUNT_NAMESPACE_PATH, + 'utf8', ) - if (response.statusCode !== 200) { - console.error('Failed to read pod', response.statusCode) - return false + // Some very ugly nesting but it's necessary + if (process.env.HEADSCALE_INTEGRATION_UNSTRICT === 'true') { + console.warn('Skipping strict Kubernetes integration check') + } else { + const pod = process.env.POD_NAME + if (!pod) { + console.error('No pod name found (POD_NAME)') + return false + } + + if (pod.trim().length === 0) { + console.error('Pod name is empty') + return false + } + + try { + const kc = new KubeConfig() + kc.loadFromCluster() + + const kCoreV1Api = kc.makeApiClient(CoreV1Api) + const { response, body } = await kCoreV1Api.readNamespacedPod( + pod, + namespace, + ) + + if (response.statusCode !== 200) { + console.error('Failed to read pod', response.statusCode) + return false + } + + const shared = body.spec?.shareProcessNamespace + if (shared === undefined) { + console.error('Pod does not have shareProcessNamespace set') + return false + } + + if (!shared) { + console.error('Pod has disabled shareProcessNamespace') + return false + } + } catch (error) { + console.error('Failed to check pod', error) + return false + } } - const shared = body.spec?.shareProcessNamespace - if (shared === undefined) { - console.error('Pod does not have shareProcessNamespace set') + const dir = resolve('/proc') + try { + const subdirs = await readdir(dir) + const promises = subdirs.map(async (dir) => { + const pid = Number.parseInt(dir, 10) + + if (Number.isNaN(pid)) { + return + } + + const path = join('/proc', dir, 'cmdline') + try { + const data = await readFile(path, 'utf8') + if (data.includes('headscale')) { + return pid + } + } catch {} + }) + + const results = await Promise.allSettled(promises) + const pids = [] + + for (const result of results) { + if (result.status === 'fulfilled' && result.value) { + pids.push(result.value) + } + } + + if (pids.length > 1) { + console.warn('Found multiple Headscale processes', pids) + console.log('Disabling the /proc integration') + return false + } + + if (pids.length === 0) { + console.warn('Could not find Headscale process') + console.log('Disabling the /proc integration') + return false + } + + pid = pids[0] + console.log('Found Headscale process', pid) + return true + } catch { return false } + }, - if (!shared) { - console.error('Pod has disabled shareProcessNamespace') - return false - } - } catch (error) { - console.error('Failed to check pod', error) - return false - } - - return true -} - -async function findPid() { - const dirs = await readdir('/proc') - - const promises = dirs.map(async (dir) => { - const pid = Number.parseInt(dir, 10) - - if (Number.isNaN(pid)) { + onAclChange: ({ pid }) => { + if (!pid) { return } - const path = join('/proc', dir, 'cmdline') - try { - const data = await readFile(path, 'utf8') - if (data.includes('headscale')) { - return pid - } - } catch {} - }) - - const results = await Promise.allSettled(promises) - const pids = [] - - for (const result of results) { - if (result.status === 'fulfilled' && result.value) { - pids.push(result.value) - } - } - - if (pids.length > 1) { - console.warn('Found multiple Headscale processes', pids) - console.log('Disabling the k8s integration') - return - } - - if (pids.length === 0) { - console.warn('Could not find Headscale process') - console.log('Disabling the k8s integration') - return - } - - return pids[0] -} - -async function sighup() { - const pid = await findPid() - if (!pid) { - return - } - - try { kill(pid, 'SIGHUP') - } catch (error) { - console.error('Failed to send SIGHUP to Headscale', error) - } -} + }, -async function restart() { - const pid = await findPid() - if (!pid) { - return - } + onConfigChange: ({ pid }) => { + if (!pid) { + return + } - try { kill(pid, 'SIGTERM') - } catch (error) { - console.error('Failed to send SIGTERM to Headscale', error) - } -} - -export default { name, preflight, sighup, restart } satisfies Integration + }, +}) diff --git a/app/integration/proc.ts b/app/integration/proc.ts index 2821557..73250d2 100644 --- a/app/integration/proc.ts +++ b/app/integration/proc.ts @@ -1,83 +1,77 @@ -import { access, constants, readdir, readFile } from 'node:fs/promises' +import { readdir, readFile } from 'node:fs/promises' import { platform } from 'node:os' import { join, resolve } from 'node:path' import { kill } from 'node:process' -import type { Integration } from '.' +import { createIntegration } from './integration' -// Integration name -const name = 'Native Linux (/proc)' - -// Check if we have a /proc and if it's readable -async function preflight() { - if (platform() !== 'linux') { - return false - } - - const dir = resolve('/proc') - try { - await access(dir, constants.R_OK) - return true - } catch (error) { - console.error('Failed to access /proc', error) - return false - } +interface Context { + pid: number | undefined } -async function findPid() { - const dirs = await readdir('/proc') +export default createIntegration({ + name: 'Native Linux (/proc)', + context: { + pid: undefined, + }, + isAvailable: async ({ pid }) => { + if (platform() !== 'linux') { + return false + } - const promises = dirs.map(async (dir) => { - const pid = Number.parseInt(dir, 10) + const dir = resolve('/proc') + try { + const subdirs = await readdir(dir) + const promises = subdirs.map(async (dir) => { + const pid = Number.parseInt(dir, 10) - if (Number.isNaN(pid)) { + if (Number.isNaN(pid)) { + return + } + + const path = join('/proc', dir, 'cmdline') + try { + const data = await readFile(path, 'utf8') + if (data.includes('headscale')) { + return pid + } + } catch {} + }) + + const results = await Promise.allSettled(promises) + const pids = [] + + for (const result of results) { + if (result.status === 'fulfilled' && result.value) { + pids.push(result.value) + } + } + + if (pids.length > 1) { + console.warn('Found multiple Headscale processes', pids) + console.log('Disabling the /proc integration') + return false + } + + if (pids.length === 0) { + console.warn('Could not find Headscale process') + console.log('Disabling the /proc integration') + return false + } + + pid = pids[0] + console.log('Found Headscale process', pid) + return true + } catch { + return false + } + }, + + onAclChange: ({ pid }) => { + if (!pid) { return } - const path = join('/proc', dir, 'cmdline') - try { - const data = await readFile(path, 'utf8') - if (data.includes('headscale')) { - return pid - } - } catch {} - }) - - const results = await Promise.allSettled(promises) - const pids = [] - - for (const result of results) { - if (result.status === 'fulfilled' && result.value) { - pids.push(result.value) - } - } - - if (pids.length > 1) { - console.warn('Found multiple Headscale processes', pids) - console.log('Disabling the /proc integration') - return - } - - if (pids.length === 0) { - console.warn('Could not find Headscale process') - console.log('Disabling the /proc integration') - return - } - - return pids[0] -} - -async function sighup() { - const pid = await findPid() - if (!pid) { - return - } - - try { kill(pid, 'SIGHUP') - } catch (error) { - console.error('Failed to send SIGHUP to Headscale', error) - } -} - -export default { name, preflight, sighup } satisfies Integration + }, +}) diff --git a/app/utils/config/headplane.ts b/app/utils/config/headplane.ts index d7d47de..4e5ad41 100644 --- a/app/utils/config/headplane.ts +++ b/app/utils/config/headplane.ts @@ -8,14 +8,14 @@ import { resolve } from 'node:path' import { parse } from 'yaml' -import { checkIntegration, Integration } from '~/integration' +import { IntegrationFactory, loadIntegration } from '~/integration' import { HeadscaleConfig, loadConfig } from './headscale' export interface HeadplaneContext { headscaleUrl: string cookieSecret: string - integration: Integration | undefined + integration: IntegrationFactory | undefined config: { read: boolean @@ -67,7 +67,7 @@ export async function loadContext(): Promise { context = { headscaleUrl, cookieSecret, - integration: await checkIntegration(), + integration: loadIntegration(), config: contextData, acl: await checkAcl(config), oidc: await checkOidc(config),