diff --git a/.env.sample b/.env.sample index f5c19c7a..bcd8054f 100644 --- a/.env.sample +++ b/.env.sample @@ -25,11 +25,14 @@ EVENT_SECRET=hell # @codex_bot webhook for reports CODEX_BOT_WEBHOOK= -# address of prometheus pushgateway -PROMETHEUS_PUSHGATEWAY_URL= +# Port for VictoriaMetrics metrics endpoint. +PROMETHEUS_METRICS_PORT= -# pushgateway push interval in ms -PROMETHEUS_PUSHGATEWAY_INTERVAL=10000 +# Host for metrics endpoint binding. +PROMETHEUS_METRICS_HOST=0.0.0.0 + +# Path for metrics endpoint. +PROMETHEUS_METRICS_PATH=/metrics # Grouper memory log controls # Number of handled tasks between memory checkpoint logs diff --git a/.env.test b/.env.test index 237fe292..a60516a7 100644 --- a/.env.test +++ b/.env.test @@ -22,8 +22,8 @@ EVENT_SECRET=hell # @codex_bot webhook for reports CODEX_BOT_WEBHOOK= -# address of prometheus pushgateway -PROMETHEUS_PUSHGATEWAY= +# Port for VictoriaMetrics metrics endpoint. +PROMETHEUS_METRICS_PORT= # Feature flags diff --git a/lib/metrics.ts b/lib/metrics.ts index e26f208f..3cfb490f 100644 --- a/lib/metrics.ts +++ b/lib/metrics.ts @@ -1,16 +1,19 @@ import * as client from 'prom-client'; -import os from 'os'; -import { nanoid } from 'nanoid'; +import * as http from 'http'; import createLogger from './logger'; const register = new client.Registry(); const logger = createLogger(); -const DEFAULT_PUSH_INTERVAL_MS = 10_000; -const ID_SIZE = 5; -const METRICS_JOB_NAME = 'workers'; +const DEFAULT_METRICS_HOST = '0.0.0.0'; +const DEFAULT_METRICS_PATH = '/metrics'; +const MIN_PORT = 1; +const MAX_PORT = 65535; +const HTTP_OK = 200; +const HTTP_NOT_FOUND = 404; +const HTTP_INTERNAL_SERVER_ERROR = 500; -let pushInterval: NodeJS.Timeout | null = null; +let metricsServer: http.Server | null = null; let currentWorkerName = ''; client.collectDefaultMetrics({ register }); @@ -18,80 +21,160 @@ client.collectDefaultMetrics({ register }); export { register, client }; /** - * Parse push interval from environment. + * Parse metrics endpoint port from environment. */ -function getPushIntervalMs(): number { - const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; - const parsedInterval = rawInterval === undefined - ? DEFAULT_PUSH_INTERVAL_MS - : Number(rawInterval); - - const interval = Number.isFinite(parsedInterval) && parsedInterval > 0 - ? parsedInterval - : DEFAULT_PUSH_INTERVAL_MS; - - if (rawInterval !== undefined && interval !== parsedInterval) { - logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`); +function getMetricsPort(): number | null { + const rawPort = process.env.PROMETHEUS_METRICS_PORT; + + if (!rawPort) { + return null; + } + + const port = Number(rawPort); + + if (!Number.isInteger(port) || port < MIN_PORT || port > MAX_PORT) { + logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PORT="${rawPort}"; expected an integer between ${MIN_PORT} and ${MAX_PORT}`); + + return null; + } + + return port; +} + +/** + * Read metrics endpoint path from environment. + */ +function getMetricsPath(): string { + const rawPath = process.env.PROMETHEUS_METRICS_PATH; + + if (!rawPath) { + return DEFAULT_METRICS_PATH; + } + + const path = rawPath.trim(); + + if (!path) { + logger.warn(`[metrics] invalid PROMETHEUS_METRICS_PATH="${rawPath}", fallback to ${DEFAULT_METRICS_PATH}`); + + return DEFAULT_METRICS_PATH; + } + + if (!path.startsWith('/')) { + const normalizedPath = `/${path}`; + + logger.warn(`[metrics] normalized PROMETHEUS_METRICS_PATH from "${rawPath}" to "${normalizedPath}"`); + + return normalizedPath; } - return interval; + return path; } /** - * Stop periodic push to pushgateway. + * Stop HTTP metrics endpoint. */ -export function stopMetricsPushing(): void { - if (!pushInterval) { +export function stopMetricsServer(): void { + if (!metricsServer) { return; } - clearInterval(pushInterval); - pushInterval = null; - logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`); - currentWorkerName = ''; + const serverToStop = metricsServer; + const stoppedWorkerName = currentWorkerName; + + if (!serverToStop.listening) { + logger.info(`[metrics] endpoint already stopped for worker=${stoppedWorkerName}`); + + if (metricsServer === serverToStop) { + metricsServer = null; + currentWorkerName = ''; + } + + return; + } + + serverToStop.close((error) => { + if (error) { + logger.error(`[metrics] failed to stop endpoint for worker=${stoppedWorkerName}: ${error.message}`); + + return; + } + + if (metricsServer === serverToStop) { + metricsServer = null; + currentWorkerName = ''; + } + + logger.info(`[metrics] stopped endpoint for worker=${stoppedWorkerName}`); + }); } /** - * Start periodic push to pushgateway. + * Start HTTP metrics endpoint for scraper-based monitoring. * - * @param workerName - name of the worker for grouping. + * @param workerName - name of the worker for default metric labels. */ -export function startMetricsPushing(workerName: string): () => void { - const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; +export function startMetricsServer(workerName: string): () => void { + const port = getMetricsPort(); - if (!url) { - return stopMetricsPushing; + if (!port) { + return stopMetricsServer; } - if (pushInterval) { - logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); + if (metricsServer) { + logger.warn(`[metrics] endpoint is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); - return stopMetricsPushing; + return stopMetricsServer; } - const interval = getPushIntervalMs(); - const hostname = os.hostname(); - const id = nanoid(ID_SIZE); - const gateway = new client.Pushgateway(url, undefined, register); + const host = process.env.PROMETHEUS_METRICS_HOST || DEFAULT_METRICS_HOST; + const path = getMetricsPath(); + + register.setDefaultLabels({ worker: workerName }); + const server = http.createServer(async (request, response) => { + const requestPath = request.url?.split('?')[0]; + + if (requestPath === '/-/healthy') { + response.writeHead(HTTP_OK, { 'Content-Type': 'text/plain' }); + response.end('ok'); + + return; + } + + if (request.method !== 'GET' || requestPath !== path) { + response.writeHead(HTTP_NOT_FOUND, { 'Content-Type': 'text/plain' }); + response.end('not found'); + + return; + } + + try { + response.writeHead(HTTP_OK, { 'Content-Type': register.contentType }); + response.end(await register.metrics()); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + + logger.error(`[metrics] failed to render metrics: ${message}`); + response.writeHead(HTTP_INTERNAL_SERVER_ERROR, { 'Content-Type': 'text/plain' }); + response.end('metrics error'); + } + }); + + server.on('error', (error) => { + logger.error(`[metrics] endpoint error for worker=${workerName}: ${error.message}`); + + if (metricsServer === server) { + metricsServer = null; + currentWorkerName = ''; + } + }); + + metricsServer = server; currentWorkerName = workerName; - logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`); - - pushInterval = setInterval(() => { - gateway.pushAdd({ - jobName: METRICS_JOB_NAME, - groupings: { - worker: workerName, - host: hostname, - id, - }, - }, (err) => { - if (err) { - logger.error(`Metrics push error: ${err.message || err}`); - } - }); - }, interval); - - return stopMetricsPushing; + server.listen(port, host, () => { + logger.info(`[metrics] endpoint started for worker=${workerName} at http://${host}:${port}${path}`); + }); + + return stopMetricsServer; } diff --git a/runner.ts b/runner.ts index e68fd090..1c19f97b 100644 --- a/runner.ts +++ b/runner.ts @@ -1,15 +1,8 @@ import * as utils from './lib/utils'; - -/* Prometheus client for pushing metrics to the pushgateway */ -// import os from 'os'; -// import * as promClient from 'prom-client'; -// import gcStats from 'prometheus-gc-stats'; -// import { nanoid } from 'nanoid'; -// import * as url from 'url'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; -import { startMetricsPushing } from './lib/metrics'; +import { startMetricsServer } from './lib/metrics'; dotenv.config(); @@ -24,15 +17,15 @@ const BEGINNING_OF_ARGS = 2; */ const workerNames = process.argv.slice(BEGINNING_OF_ARGS); -/** +/** * Initialize HawkCatcher -*/ + */ if (process.env.HAWK_CATCHER_TOKEN) { HawkCatcher.init({ token: process.env.HAWK_CATCHER_TOKEN, context: { - workerTypes: workerNames.join(","), - } + workerTypes: workerNames.join(','), + }, }); } @@ -46,12 +39,10 @@ class WorkerRunner { */ private workers: Worker[] = []; - // private gateway?: promClient.Pushgateway; - /** - * Metrics push cleanup callback. + * Metrics endpoint cleanup callback. */ - private stopMetricsPushing?: () => void; + private stopMetricsServer?: () => void; /** * Create runner instance @@ -90,7 +81,7 @@ class WorkerRunner { * Run metrics exporter */ private startMetrics(): void { - if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + if (!process.env.PROMETHEUS_METRICS_PORT && !process.env.PROMETHEUS_PUSHGATEWAY_URL) { return; } @@ -105,10 +96,10 @@ class WorkerRunner { const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process'; if (workerTypes.length > 1) { - console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`); + console.warn(`[metrics] ${workerTypes.length} workers are running in one process; exposing metrics as "${workerTypeForMetrics}"`); } - this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics); + this.stopMetricsServer = startMetricsServer(workerTypeForMetrics); } /** @@ -243,9 +234,8 @@ class WorkerRunner { */ private async stopWorker(worker: Worker): Promise { try { - // stop pushing metrics - this.stopMetricsPushing?.(); - this.stopMetricsPushing = undefined; + this.stopMetricsServer?.(); + this.stopMetricsServer = undefined; await worker.finish(); console.log(