feat(TALE-7): reimplement integration system

This commit is contained in:
Aarnav Tale 2024-07-09 22:53:00 -04:00
parent 3cc726320a
commit 0aa0406ea6
No known key found for this signature in database
6 changed files with 356 additions and 375 deletions

View File

@ -5,123 +5,143 @@ import { Client } from 'undici'
import { HeadscaleError, pull } from '~/utils/headscale'
import type { Integration } from '.'
import { createIntegration } from './integration'
// Integration name
const name = 'Docker'
interface Context {
client: Client | undefined
container: string | undefined
maxAttempts: number
}
let url: URL | undefined
let container: string | undefined
export default createIntegration<Context>({
name: 'Docker',
context: {
client: undefined,
container: undefined,
maxAttempts: 10,
},
isAvailable: async ({ client, container }) => {
// Check for the HEADSCALE_CONTAINER environment variable first
// to avoid unnecessary fetching of the Docker socket
container = process.env.HEADSCALE_CONTAINER
?.trim()
.toLowerCase()
async function preflight() {
const path = process.env.DOCKER_SOCK ?? 'unix:///var/run/docker.sock'
if (!container || container.length === 0) {
return false
}
try {
url = new URL(path)
} catch {
return false
}
const path = process.env.DOCKER_SOCK ?? 'unix:///var/run/docker.sock'
let url: URL | undefined
// The API is available as an HTTP endpoint
if (url.protocol === 'tcp:') {
url.protocol = 'http:'
}
// Check if the socket is accessible
if (url.protocol === 'unix:') {
try {
await access(path, constants.R_OK)
url = new URL(path)
} catch {
return false
}
}
if (url.protocol === 'http:') {
try {
await fetch(new URL('/v1.30/version', url).href)
} catch {
if (url.protocol !== 'tcp:' && url.protocol !== 'unix:') {
return false
}
}
if (url.protocol !== 'http:' && url.protocol !== 'unix:') {
return false
}
// The API is available as an HTTP endpoint and this
// will simplify the fetching logic in undici
if (url.protocol === 'tcp:') {
url.protocol = 'http:'
try {
await fetch(new URL('/v1.30/version', url).href)
} catch {
return false
}
container = process.env.HEADSCALE_CONTAINER
?.trim()
.toLowerCase()
client = new Client(url.href)
}
if (!container || container.length === 0) {
return false
}
// Check if the socket is accessible
if (url.protocol === 'unix:') {
try {
await access(path, constants.R_OK)
} catch {
return false
}
return true
}
client = new Client('http://localhost', {
socketPath: path,
})
}
async function sighup() {
if (!url || !container) {
return
}
return client === undefined
},
// Supports the DOCKER_SOCK environment variable
const client = url.protocol === 'unix:'
? new Client('http://localhost', {
socketPath: url.href,
})
: new Client(url.href)
const response = await client.request({
method: 'POST',
path: `/v1.30/containers/${container}/kill?signal=SIGHUP`,
})
if (!response.statusCode || response.statusCode !== 204) {
throw new Error('Failed to send SIGHUP to Headscale')
}
}
async function restart() {
if (!url || !container) {
return
}
// Supports the DOCKER_SOCK environment variable
const client = url.protocol === 'unix:'
? new Client('http://localhost', {
socketPath: url.href,
})
: new Client(url.href)
const response = await client.request({
method: 'POST',
path: `/v1.30/containers/${container}/restart`,
})
if (!response.statusCode || response.statusCode !== 204) {
throw new Error('Failed to restart Headscale')
}
// Wait for Headscale to restart before continuing
let attempts = 0
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition, no-constant-condition
while (true) {
try {
await pull('v1', '')
onAclChange: async ({ client, container, maxAttempts }) => {
if (!client || !container) {
return
} catch (error) {
if (error instanceof HeadscaleError && error.status === 401) {
break
}
if (attempts > 10) {
throw new Error('Headscale did not restart in time')
}
attempts++
await setTimeout(1000)
}
}
}
export default { name, preflight, sighup, restart } satisfies Integration
let attempts = 0
while (attempts <= maxAttempts) {
const response = await client.request({
method: 'POST',
path: `/v1.30/containers/${container}/kill?signal=SIGHUP`,
})
if (response.statusCode !== 204) {
if (attempts < maxAttempts) {
attempts++
await setTimeout(1000)
continue
}
const stringCode = response.statusCode.toString()
const body = await response.body.text()
throw new Error(`API request failed: ${stringCode} ${body}`)
}
}
},
onConfigChange: async ({ client, container, maxAttempts }) => {
if (!client || !container) {
return
}
let attempts = 0
while (attempts <= maxAttempts) {
const response = await client.request({
method: 'POST',
path: `/v1.30/containers/${container}/restart`,
})
if (response.statusCode !== 204) {
if (attempts < maxAttempts) {
attempts++
await setTimeout(1000)
continue
}
const stringCode = response.statusCode.toString()
const body = await response.body.text()
throw new Error(`API request failed: ${stringCode} ${body}`)
}
}
attempts = 0
while (attempts <= maxAttempts) {
try {
await pull('v1', '')
return
} catch (error) {
if (error instanceof HeadscaleError && error.status === 401) {
break
}
if (attempts < maxAttempts) {
attempts++
await setTimeout(1000)
continue
}
throw new Error(`Missed restart deadline for ${container}`)
}
}
},
})

View File

@ -1,28 +1,18 @@
import docker from './docker'
import kubernetes from './kubernetes'
import proc from './proc'
import dockerIntegration from './docker'
import kubernetesIntegration from './kubernetes'
import procIntegration from './proc'
export interface Integration {
name: string
preflight: () => Promise<boolean>
sighup?: () => Promise<void>
restart?: () => Promise<void>
}
export * from './integration'
// Because we previously supported the Docker integration by
// checking for the HEADSCALE_CONTAINER variable, we need to
// check for it here as well.
//
// This ensures that when people upgrade from older versions
// of Headplane, they don't explicitly need to define the new
// HEADSCALE_INTEGRATION variable that is needed to configure
// an integration.
export async function checkIntegration() {
export function loadIntegration() {
let integration = process.env.HEADSCALE_INTEGRATION
?.trim()
.toLowerCase()
// Old HEADSCALE_CONTAINER variable upgrade path
// This ensures that when people upgrade from older versions of Headplane
// they don't explicitly need to define the new HEADSCALE_INTEGRATION
// variable that is needed to configure docker
if (!integration && process.env.HEADSCALE_CONTAINER) {
integration = 'docker'
}
@ -32,32 +22,22 @@ export async function checkIntegration() {
return
}
let module: Integration | undefined
try {
module = getIntegration(integration)
await module.preflight()
} catch (error) {
console.error('Failed to load integration', error)
return
}
return module
}
function getIntegration(name: string) {
switch (name) {
switch (integration.toLowerCase().trim()) {
case 'docker': {
return docker
return dockerIntegration
}
case 'proc': {
return proc
case 'proc':
case 'native':
case 'linux': {
return procIntegration
}
case 'kubernetes':
case 'k8s': {
return kubernetes
}
default: {
throw new Error(`Unknown integration: ${name}`)
return kubernetesIntegration
}
}
console.error('Unknown integration:', integration)
}

View File

@ -0,0 +1,14 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export interface IntegrationFactory<T = any> {
name: string
context: T
isAvailable: (context: T) => Promise<boolean> | boolean
onAclChange?: (context: T) => Promise<void> | void
onConfigChange?: (context: T) => Promise<void> | void
}
export function createIntegration<T>(
options: IntegrationFactory<T>,
) {
return options
}

View File

@ -1,189 +1,162 @@
import { access, constants, readdir, readFile } from 'node:fs/promises'
import { readdir, readFile } from 'node:fs/promises'
import { platform } from 'node:os'
import { join, resolve } from 'node:path'
import { kill } from 'node:process'
import { Config, CoreV1Api, KubeConfig } from '@kubernetes/client-node'
import type { Integration } from '.'
import { createIntegration } from './integration'
// Integration name
const name = 'Kubernetes (k8s)'
// Check if we have a proper service account and /proc
// This is because the Kubernetes integration is basically
// the /proc integration plus some extra steps.
async function preflight() {
if (platform() !== 'linux') {
console.error('Not running on k8s Linux')
return false
}
const dir = resolve('/proc')
try {
await access(dir, constants.R_OK)
} catch (error) {
console.error('Failed to access /proc', error)
return false
}
const secretsDir = resolve(Config.SERVICEACCOUNT_ROOT)
try {
const files = await readdir(secretsDir)
if (files.length === 0) {
console.error('No Kubernetes service account found')
return false
}
const mappedFiles = new Set(files.map(file => join(secretsDir, file)))
const expectedFiles = [
Config.SERVICEACCOUNT_CA_PATH,
Config.SERVICEACCOUNT_TOKEN_PATH,
Config.SERVICEACCOUNT_NAMESPACE_PATH,
]
if (!expectedFiles.every(file => mappedFiles.has(file))) {
console.error('Kubernetes service account is incomplete')
return false
}
} catch (error) {
console.error('Failed to access Kubernetes service account', error)
return false
}
const namespace = await readFile(Config.SERVICEACCOUNT_NAMESPACE_PATH, 'utf8')
if (namespace.trim().length === 0) {
console.error('Kubernetes namespace is empty')
return false
}
const skip = process.env.HEADSCALE_INTEGRATION_UNSTRICT
if (skip === 'true' || skip === '1') {
console.warn('Skipping strict Kubernetes integration check')
return true
}
// Some very ugly nesting but it's necessary
const pod = process.env.POD_NAME
if (!pod) {
console.error('No pod name found (POD_NAME)')
return false
}
const result = await checkPod(pod, namespace)
if (!result) {
return false
}
return true
interface Context {
pid: number | undefined
}
async function checkPod(pod: string, namespace: string) {
if (pod.trim().length === 0) {
console.error('Pod name is empty')
return false
}
export default createIntegration<Context>({
name: 'Kubernetes (k8s)',
context: {
pid: undefined,
},
isAvailable: async ({ pid }) => {
if (platform() !== 'linux') {
return false
}
try {
const kc = new KubeConfig()
kc.loadFromCluster()
const svcRoot = Config.SERVICEACCOUNT_ROOT
try {
const files = await readdir(svcRoot)
if (files.length === 0) {
console.error('No Kubernetes service account found')
return false
}
const kCoreV1Api = kc.makeApiClient(CoreV1Api)
const { response, body } = await kCoreV1Api.readNamespacedPod(
pod,
namespace,
const mappedFiles = new Set(files.map(file => join(svcRoot, file)))
const expectedFiles = [
Config.SERVICEACCOUNT_CA_PATH,
Config.SERVICEACCOUNT_TOKEN_PATH,
Config.SERVICEACCOUNT_NAMESPACE_PATH,
]
if (!expectedFiles.every(file => mappedFiles.has(file))) {
console.error('Kubernetes service account is incomplete')
return false
}
} catch (error) {
console.error('Failed to access Kubernetes service account', error)
return false
}
const namespace = await readFile(
Config.SERVICEACCOUNT_NAMESPACE_PATH,
'utf8',
)
if (response.statusCode !== 200) {
console.error('Failed to read pod', response.statusCode)
return false
// Some very ugly nesting but it's necessary
if (process.env.HEADSCALE_INTEGRATION_UNSTRICT === 'true') {
console.warn('Skipping strict Kubernetes integration check')
} else {
const pod = process.env.POD_NAME
if (!pod) {
console.error('No pod name found (POD_NAME)')
return false
}
if (pod.trim().length === 0) {
console.error('Pod name is empty')
return false
}
try {
const kc = new KubeConfig()
kc.loadFromCluster()
const kCoreV1Api = kc.makeApiClient(CoreV1Api)
const { response, body } = await kCoreV1Api.readNamespacedPod(
pod,
namespace,
)
if (response.statusCode !== 200) {
console.error('Failed to read pod', response.statusCode)
return false
}
const shared = body.spec?.shareProcessNamespace
if (shared === undefined) {
console.error('Pod does not have shareProcessNamespace set')
return false
}
if (!shared) {
console.error('Pod has disabled shareProcessNamespace')
return false
}
} catch (error) {
console.error('Failed to check pod', error)
return false
}
}
const shared = body.spec?.shareProcessNamespace
if (shared === undefined) {
console.error('Pod does not have shareProcessNamespace set')
const dir = resolve('/proc')
try {
const subdirs = await readdir(dir)
const promises = subdirs.map(async (dir) => {
const pid = Number.parseInt(dir, 10)
if (Number.isNaN(pid)) {
return
}
const path = join('/proc', dir, 'cmdline')
try {
const data = await readFile(path, 'utf8')
if (data.includes('headscale')) {
return pid
}
} catch {}
})
const results = await Promise.allSettled(promises)
const pids = []
for (const result of results) {
if (result.status === 'fulfilled' && result.value) {
pids.push(result.value)
}
}
if (pids.length > 1) {
console.warn('Found multiple Headscale processes', pids)
console.log('Disabling the /proc integration')
return false
}
if (pids.length === 0) {
console.warn('Could not find Headscale process')
console.log('Disabling the /proc integration')
return false
}
pid = pids[0]
console.log('Found Headscale process', pid)
return true
} catch {
return false
}
},
if (!shared) {
console.error('Pod has disabled shareProcessNamespace')
return false
}
} catch (error) {
console.error('Failed to check pod', error)
return false
}
return true
}
async function findPid() {
const dirs = await readdir('/proc')
const promises = dirs.map(async (dir) => {
const pid = Number.parseInt(dir, 10)
if (Number.isNaN(pid)) {
onAclChange: ({ pid }) => {
if (!pid) {
return
}
const path = join('/proc', dir, 'cmdline')
try {
const data = await readFile(path, 'utf8')
if (data.includes('headscale')) {
return pid
}
} catch {}
})
const results = await Promise.allSettled(promises)
const pids = []
for (const result of results) {
if (result.status === 'fulfilled' && result.value) {
pids.push(result.value)
}
}
if (pids.length > 1) {
console.warn('Found multiple Headscale processes', pids)
console.log('Disabling the k8s integration')
return
}
if (pids.length === 0) {
console.warn('Could not find Headscale process')
console.log('Disabling the k8s integration')
return
}
return pids[0]
}
async function sighup() {
const pid = await findPid()
if (!pid) {
return
}
try {
kill(pid, 'SIGHUP')
} catch (error) {
console.error('Failed to send SIGHUP to Headscale', error)
}
}
},
async function restart() {
const pid = await findPid()
if (!pid) {
return
}
onConfigChange: ({ pid }) => {
if (!pid) {
return
}
try {
kill(pid, 'SIGTERM')
} catch (error) {
console.error('Failed to send SIGTERM to Headscale', error)
}
}
export default { name, preflight, sighup, restart } satisfies Integration
},
})

View File

@ -1,83 +1,77 @@
import { access, constants, readdir, readFile } from 'node:fs/promises'
import { readdir, readFile } from 'node:fs/promises'
import { platform } from 'node:os'
import { join, resolve } from 'node:path'
import { kill } from 'node:process'
import type { Integration } from '.'
import { createIntegration } from './integration'
// Integration name
const name = 'Native Linux (/proc)'
// Check if we have a /proc and if it's readable
async function preflight() {
if (platform() !== 'linux') {
return false
}
const dir = resolve('/proc')
try {
await access(dir, constants.R_OK)
return true
} catch (error) {
console.error('Failed to access /proc', error)
return false
}
interface Context {
pid: number | undefined
}
async function findPid() {
const dirs = await readdir('/proc')
export default createIntegration<Context>({
name: 'Native Linux (/proc)',
context: {
pid: undefined,
},
isAvailable: async ({ pid }) => {
if (platform() !== 'linux') {
return false
}
const promises = dirs.map(async (dir) => {
const pid = Number.parseInt(dir, 10)
const dir = resolve('/proc')
try {
const subdirs = await readdir(dir)
const promises = subdirs.map(async (dir) => {
const pid = Number.parseInt(dir, 10)
if (Number.isNaN(pid)) {
if (Number.isNaN(pid)) {
return
}
const path = join('/proc', dir, 'cmdline')
try {
const data = await readFile(path, 'utf8')
if (data.includes('headscale')) {
return pid
}
} catch {}
})
const results = await Promise.allSettled(promises)
const pids = []
for (const result of results) {
if (result.status === 'fulfilled' && result.value) {
pids.push(result.value)
}
}
if (pids.length > 1) {
console.warn('Found multiple Headscale processes', pids)
console.log('Disabling the /proc integration')
return false
}
if (pids.length === 0) {
console.warn('Could not find Headscale process')
console.log('Disabling the /proc integration')
return false
}
pid = pids[0]
console.log('Found Headscale process', pid)
return true
} catch {
return false
}
},
onAclChange: ({ pid }) => {
if (!pid) {
return
}
const path = join('/proc', dir, 'cmdline')
try {
const data = await readFile(path, 'utf8')
if (data.includes('headscale')) {
return pid
}
} catch {}
})
const results = await Promise.allSettled(promises)
const pids = []
for (const result of results) {
if (result.status === 'fulfilled' && result.value) {
pids.push(result.value)
}
}
if (pids.length > 1) {
console.warn('Found multiple Headscale processes', pids)
console.log('Disabling the /proc integration')
return
}
if (pids.length === 0) {
console.warn('Could not find Headscale process')
console.log('Disabling the /proc integration')
return
}
return pids[0]
}
async function sighup() {
const pid = await findPid()
if (!pid) {
return
}
try {
kill(pid, 'SIGHUP')
} catch (error) {
console.error('Failed to send SIGHUP to Headscale', error)
}
}
export default { name, preflight, sighup } satisfies Integration
},
})

View File

@ -8,14 +8,14 @@ import { resolve } from 'node:path'
import { parse } from 'yaml'
import { checkIntegration, Integration } from '~/integration'
import { IntegrationFactory, loadIntegration } from '~/integration'
import { HeadscaleConfig, loadConfig } from './headscale'
export interface HeadplaneContext {
headscaleUrl: string
cookieSecret: string
integration: Integration | undefined
integration: IntegrationFactory | undefined
config: {
read: boolean
@ -67,7 +67,7 @@ export async function loadContext(): Promise<HeadplaneContext> {
context = {
headscaleUrl,
cookieSecret,
integration: await checkIntegration(),
integration: loadIntegration(),
config: contextData,
acl: await checkAcl(config),
oidc: await checkOidc(config),