diff --git a/.server-changes/fix-sse-memory-leak.md b/.server-changes/fix-sse-memory-leak.md new file mode 100644 index 0000000000..e2b9ddd181 --- /dev/null +++ b/.server-changes/fix-sse-memory-leak.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix memory leak where every aborted SSE connection pinned the full request/response graph on Node 20, caused by `AbortSignal.any()` in `sse.ts` retaining its source signals indefinitely (see nodejs/node#54614, nodejs/node#55351). Also clear the `setTimeout(abort)` timer in `entry.server.tsx` so successful HTML renders don't pin the React tree for 30s per request. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 4ee4f252a3..87171011e0 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -83,6 +83,10 @@ function handleBotRequest( ) { return new Promise((resolve, reject) => { let shellRendered = false; + // Timer handle is cleared in every terminal callback so the abort closure + // (which captures the full React render tree + remixContext) doesn't pin + // memory for 30s per successful request. See react-router PR #14200. + let abortTimer: NodeJS.Timeout | undefined; const { pipe, abort } = renderToPipeableStream( @@ -105,8 +109,10 @@ function handleBotRequest( ); pipe(body); + clearTimeout(abortTimer); }, onShellError(error: unknown) { + clearTimeout(abortTimer); reject(error); }, onError(error: unknown) { @@ -121,7 +127,7 @@ function handleBotRequest( } ); - setTimeout(abort, ABORT_DELAY); + abortTimer = setTimeout(abort, ABORT_DELAY); }); } @@ -135,6 +141,10 @@ function handleBrowserRequest( ) { return new Promise((resolve, reject) => { let shellRendered = false; + // Timer handle is cleared in every terminal callback so the abort closure + // (which captures the full React render tree + remixContext) doesn't pin + // memory for 30s per successful request. See react-router PR #14200. + let abortTimer: NodeJS.Timeout | undefined; const { pipe, abort } = renderToPipeableStream( @@ -157,8 +167,10 @@ function handleBrowserRequest( ); pipe(body); + clearTimeout(abortTimer); }, onShellError(error: unknown) { + clearTimeout(abortTimer); reject(error); }, onError(error: unknown) { @@ -173,7 +185,7 @@ function handleBrowserRequest( } ); - setTimeout(abort, ABORT_DELAY); + abortTimer = setTimeout(abort, ABORT_DELAY); }); } diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index ba40624058..c10446d08a 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -438,6 +438,7 @@ const EnvironmentSchema = z INTERNAL_OTEL_TRACE_SAMPLING_RATE: z.string().default("20"), INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED: z.string().default("0"), INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"), + DISABLE_HTTP_INSTRUMENTATION: BoolEnv.default(false), INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(), INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(), diff --git a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts index 1dd4edc623..69560c49e8 100644 --- a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts @@ -1,7 +1,7 @@ import { type PrismaClient, prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; -import { createSSELoader, SendFunction } from "~/utils/sse"; +import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse"; import { throttle } from "~/utils/throttle"; import { tracePubSub } from "~/v3/services/tracePubSub.server"; @@ -66,8 +66,10 @@ export class RunStreamPresenter { }); } } - // Abort the stream on send error - context.controller.abort("Send error"); + // Abort the stream on send error. Uses a stackless string sentinel + // from sse.ts — a no-arg abort() would create a DOMException with a + // stack trace, which is unnecessary retention on the signal.reason. + context.controller.abort(ABORT_REASON_SEND_ERROR); } }, 1000 diff --git a/apps/webapp/app/utils/sse.ts b/apps/webapp/app/utils/sse.ts index f48cc9e31f..53f9aa010c 100644 --- a/apps/webapp/app/utils/sse.ts +++ b/apps/webapp/app/utils/sse.ts @@ -38,6 +38,20 @@ type SSEOptions = { // This is used to track the open connections, for debugging const connections: Set = new Set(); +// Stackless sentinel reasons passed to AbortController#abort. Calling .abort() +// with no argument produces a DOMException that captures a ~500-byte stack +// trace; a string reason is stored verbatim with no stack. The choice of +// reason type does not cause the retention we saw in prod (that was the +// AbortSignal.any composite — see comment near the timeoutTimer below for the +// Node issue refs), but naming the sentinels keeps call sites readable and +// lets future signal.reason consumers branch on the cause. +export const ABORT_REASON_REQUEST = "request_aborted"; +export const ABORT_REASON_TIMEOUT = "timeout"; +export const ABORT_REASON_SEND_ERROR = "send_error"; +export const ABORT_REASON_INIT_STOP = "init_requested_stop"; +export const ABORT_REASON_ITERATOR_STOP = "iterator_requested_stop"; +export const ABORT_REASON_ITERATOR_ERROR = "iterator_error"; + export function createSSELoader(options: SSEOptions) { const { timeout, interval = 500, debug = false, handler } = options; @@ -45,7 +59,6 @@ export function createSSELoader(options: SSEOptions) { const id = request.headers.get("x-request-id") || Math.random().toString(36).slice(2, 8); const internalController = new AbortController(); - const timeoutSignal = AbortSignal.timeout(timeout); const log = (message: string) => { if (debug) @@ -60,16 +73,20 @@ export function createSSELoader(options: SSEOptions) { if (!internalController.signal.aborted) { originalSend(event); } - // If controller is aborted, silently ignore the send attempt } catch (error) { if (error instanceof Error) { if (error.message?.includes("Controller is already closed")) { - // Silently handle controller closed errors return; } log(`Error sending event: ${error.message}`); } - throw error; // Re-throw other errors + // Abort before rethrowing so timer + request-abort listener are cleaned + // up immediately. Otherwise a send-failure in initStream leaves them + // alive until `timeout` fires. + if (!internalController.signal.aborted) { + internalController.abort(ABORT_REASON_SEND_ERROR); + } + throw error; } }; }; @@ -92,51 +109,57 @@ export function createSSELoader(options: SSEOptions) { const requestAbortSignal = getRequestAbortSignal(); - const combinedSignal = AbortSignal.any([ - requestAbortSignal, - timeoutSignal, - internalController.signal, - ]); - log("Start"); - requestAbortSignal.addEventListener( - "abort", - () => { - log(`request signal aborted`); - internalController.abort("Request aborted"); - }, - { once: true, signal: internalController.signal } - ); + // Single-signal abort chain: everything rolls up into internalController. + // Timeout is a plain setTimeout cleared on abort rather than an + // AbortSignal.timeout() combined via AbortSignal.any() — AbortSignal.any + // keeps its source signals in an internal Set managed by a + // FinalizationRegistry, and under sustained request traffic those entries + // accumulate faster than they get cleaned up, pinning every source signal + // (and its listeners, and anything those listeners close over) until the + // parent signal is GC'd or aborts. Reproduced locally in isolation; shape + // matches the ChainSafe Lodestar production case described in + // nodejs/node#54614. See also nodejs/node#55351 (mechanism confirmed by + // @jasnell, narrow fix in 22.12.0 via #55354) and nodejs/node#57584 + // (circular-dep variant, still open). + const timeoutTimer = setTimeout(() => { + if (!internalController.signal.aborted) internalController.abort(ABORT_REASON_TIMEOUT); + }, timeout); + + const onRequestAbort = () => { + log("request signal aborted"); + if (!internalController.signal.aborted) internalController.abort(ABORT_REASON_REQUEST); + }; - combinedSignal.addEventListener( + internalController.signal.addEventListener( "abort", () => { - log(`combinedSignal aborted: ${combinedSignal.reason}`); + clearTimeout(timeoutTimer); + requestAbortSignal.removeEventListener("abort", onRequestAbort); }, - { once: true, signal: internalController.signal } + { once: true } ); - timeoutSignal.addEventListener( - "abort", - () => { - if (internalController.signal.aborted) return; - log(`timeoutSignal aborted: ${timeoutSignal.reason}`); - internalController.abort("Timeout"); - }, - { once: true, signal: internalController.signal } - ); + // The request could have been aborted during `await handler(context)` above. + // AbortSignal listeners added after the signal is already aborted never fire, + // so invoke cleanup synchronously in that case instead of waiting for `timeout`. + if (requestAbortSignal.aborted) { + onRequestAbort(); + } else { + requestAbortSignal.addEventListener("abort", onRequestAbort, { once: true }); + } if (handlers.beforeStream) { const shouldContinue = await handlers.beforeStream(); if (shouldContinue === false) { log("beforeStream returned false, so we'll exit before creating the stream"); - internalController.abort("Init requested stop"); + internalController.abort(ABORT_REASON_INIT_STOP); return; } } - return eventStream(combinedSignal, function setup(send) { + return eventStream(internalController.signal, function setup(send) { connections.add(id); const safeSend = createSafeSend(send); @@ -147,14 +170,14 @@ export function createSSELoader(options: SSEOptions) { const shouldContinue = await handlers.initStream({ send: safeSend }); if (shouldContinue === false) { log("initStream returned false, so we'll stop the stream"); - internalController.abort("Init requested stop"); + internalController.abort(ABORT_REASON_INIT_STOP); return; } } log("Starting interval"); for await (const _ of setInterval(interval, null, { - signal: combinedSignal, + signal: internalController.signal, })) { log("PING"); @@ -165,13 +188,16 @@ export function createSSELoader(options: SSEOptions) { const shouldContinue = await handlers.iterator({ date, send: safeSend }); if (shouldContinue === false) { log("iterator return false, so we'll stop the stream"); - internalController.abort("Iterator requested stop"); + internalController.abort(ABORT_REASON_ITERATOR_STOP); break; } } catch (error) { log("iterator threw an error, aborting stream"); // Immediately abort to trigger cleanup - internalController.abort(error instanceof Error ? error.message : "Iterator error"); + if (error instanceof Error && error.name !== "AbortError") { + log(`iterator error: ${error.message}`); + } + internalController.abort(ABORT_REASON_ITERATOR_ERROR); // No need to re-throw as we're handling it by aborting return; // Exit the run function immediately } diff --git a/apps/webapp/app/v3/tracer.server.ts b/apps/webapp/app/v3/tracer.server.ts index 2ce5aa275c..1115ab42de 100644 --- a/apps/webapp/app/v3/tracer.server.ts +++ b/apps/webapp/app/v3/tracer.server.ts @@ -302,13 +302,15 @@ function setupTelemetry() { provider.register(); let instrumentations: Instrumentation[] = [ - new HttpInstrumentation(), - new ExpressInstrumentation(), new AwsSdkInstrumentation({ suppressInternalInstrumentation: true, }), ]; + if (!env.DISABLE_HTTP_INSTRUMENTATION) { + instrumentations.unshift(new HttpInstrumentation(), new ExpressInstrumentation()); + } + if (env.INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED === "1") { instrumentations.push(new PrismaInstrumentation()); }