Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-sse-memory-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix memory leak where every aborted SSE connection pinned the full request/response graph on Node 20, caused by `AbortSignal.any()` in `sse.ts` retaining its source signals indefinitely (see nodejs/node#54614, nodejs/node#55351). Also clear the `setTimeout(abort)` timer in `entry.server.tsx` so successful HTML renders don't pin the React tree for 30s per request.
16 changes: 14 additions & 2 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ function handleBotRequest(
) {
return new Promise((resolve, reject) => {
let shellRendered = false;
// Timer handle is cleared in every terminal callback so the abort closure
// (which captures the full React render tree + remixContext) doesn't pin
// memory for 30s per successful request. See react-router PR #14200.
let abortTimer: NodeJS.Timeout | undefined;
const { pipe, abort } = renderToPipeableStream(
<OperatingSystemContextProvider platform={platform}>
<LocaleContextProvider locales={locales}>
Expand All @@ -105,8 +109,10 @@ function handleBotRequest(
);

pipe(body);
clearTimeout(abortTimer);
},
onShellError(error: unknown) {
clearTimeout(abortTimer);
reject(error);
},
onError(error: unknown) {
Expand All @@ -121,7 +127,7 @@ function handleBotRequest(
}
);

setTimeout(abort, ABORT_DELAY);
abortTimer = setTimeout(abort, ABORT_DELAY);
});
}

Expand All @@ -135,6 +141,10 @@ function handleBrowserRequest(
) {
return new Promise((resolve, reject) => {
let shellRendered = false;
// Timer handle is cleared in every terminal callback so the abort closure
// (which captures the full React render tree + remixContext) doesn't pin
// memory for 30s per successful request. See react-router PR #14200.
let abortTimer: NodeJS.Timeout | undefined;
const { pipe, abort } = renderToPipeableStream(
<OperatingSystemContextProvider platform={platform}>
<LocaleContextProvider locales={locales}>
Expand All @@ -157,8 +167,10 @@ function handleBrowserRequest(
);

pipe(body);
clearTimeout(abortTimer);
},
onShellError(error: unknown) {
clearTimeout(abortTimer);
reject(error);
},
onError(error: unknown) {
Expand All @@ -173,7 +185,7 @@ function handleBrowserRequest(
}
);

setTimeout(abort, ABORT_DELAY);
abortTimer = setTimeout(abort, ABORT_DELAY);
});
}

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ const EnvironmentSchema = z
INTERNAL_OTEL_TRACE_SAMPLING_RATE: z.string().default("20"),
INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED: z.string().default("0"),
INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"),
DISABLE_HTTP_INSTRUMENTATION: BoolEnv.default(false),

INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(),
INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(),
Expand Down
8 changes: 5 additions & 3 deletions apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type PrismaClient, prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { createSSELoader, SendFunction } from "~/utils/sse";
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
import { throttle } from "~/utils/throttle";
import { tracePubSub } from "~/v3/services/tracePubSub.server";

Expand Down Expand Up @@ -66,8 +66,10 @@ export class RunStreamPresenter {
});
}
}
// Abort the stream on send error
context.controller.abort("Send error");
// Abort the stream on send error. Uses a stackless string sentinel
// from sse.ts — a no-arg abort() would create a DOMException with a
// stack trace, which is unnecessary retention on the signal.reason.
context.controller.abort(ABORT_REASON_SEND_ERROR);
}
},
1000
Expand Down
98 changes: 62 additions & 36 deletions apps/webapp/app/utils/sse.ts
Comment thread
ericallam marked this conversation as resolved.
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,27 @@ type SSEOptions = {
// This is used to track the open connections, for debugging
const connections: Set<string> = new Set();

// Stackless sentinel reasons passed to AbortController#abort. Calling .abort()
// with no argument produces a DOMException that captures a ~500-byte stack
// trace; a string reason is stored verbatim with no stack. The choice of
// reason type does not cause the retention we saw in prod (that was the
// AbortSignal.any composite — see comment near the timeoutTimer below for the
// Node issue refs), but naming the sentinels keeps call sites readable and
// lets future signal.reason consumers branch on the cause.
export const ABORT_REASON_REQUEST = "request_aborted";
export const ABORT_REASON_TIMEOUT = "timeout";
export const ABORT_REASON_SEND_ERROR = "send_error";
export const ABORT_REASON_INIT_STOP = "init_requested_stop";
export const ABORT_REASON_ITERATOR_STOP = "iterator_requested_stop";
export const ABORT_REASON_ITERATOR_ERROR = "iterator_error";

export function createSSELoader(options: SSEOptions) {
const { timeout, interval = 500, debug = false, handler } = options;

return async function loader({ request, params }: LoaderFunctionArgs) {
const id = request.headers.get("x-request-id") || Math.random().toString(36).slice(2, 8);

const internalController = new AbortController();
const timeoutSignal = AbortSignal.timeout(timeout);

const log = (message: string) => {
if (debug)
Expand All @@ -60,16 +73,20 @@ export function createSSELoader(options: SSEOptions) {
if (!internalController.signal.aborted) {
originalSend(event);
}
// If controller is aborted, silently ignore the send attempt
} catch (error) {
if (error instanceof Error) {
if (error.message?.includes("Controller is already closed")) {
// Silently handle controller closed errors
return;
}
log(`Error sending event: ${error.message}`);
}
throw error; // Re-throw other errors
// Abort before rethrowing so timer + request-abort listener are cleaned
// up immediately. Otherwise a send-failure in initStream leaves them
// alive until `timeout` fires.
if (!internalController.signal.aborted) {
internalController.abort(ABORT_REASON_SEND_ERROR);
}
throw error;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
};
};
Expand All @@ -92,51 +109,57 @@ export function createSSELoader(options: SSEOptions) {

const requestAbortSignal = getRequestAbortSignal();

const combinedSignal = AbortSignal.any([
requestAbortSignal,
timeoutSignal,
internalController.signal,
]);

log("Start");

requestAbortSignal.addEventListener(
"abort",
() => {
log(`request signal aborted`);
internalController.abort("Request aborted");
},
{ once: true, signal: internalController.signal }
);
// Single-signal abort chain: everything rolls up into internalController.
// Timeout is a plain setTimeout cleared on abort rather than an
// AbortSignal.timeout() combined via AbortSignal.any() — AbortSignal.any
// keeps its source signals in an internal Set<WeakRef> managed by a
// FinalizationRegistry, and under sustained request traffic those entries
// accumulate faster than they get cleaned up, pinning every source signal
// (and its listeners, and anything those listeners close over) until the
// parent signal is GC'd or aborts. Reproduced locally in isolation; shape
// matches the ChainSafe Lodestar production case described in
// nodejs/node#54614. See also nodejs/node#55351 (mechanism confirmed by
// @jasnell, narrow fix in 22.12.0 via #55354) and nodejs/node#57584
// (circular-dep variant, still open).
const timeoutTimer = setTimeout(() => {
if (!internalController.signal.aborted) internalController.abort(ABORT_REASON_TIMEOUT);
}, timeout);

const onRequestAbort = () => {
log("request signal aborted");
if (!internalController.signal.aborted) internalController.abort(ABORT_REASON_REQUEST);
};

combinedSignal.addEventListener(
internalController.signal.addEventListener(
"abort",
() => {
log(`combinedSignal aborted: ${combinedSignal.reason}`);
clearTimeout(timeoutTimer);
requestAbortSignal.removeEventListener("abort", onRequestAbort);
},
{ once: true, signal: internalController.signal }
{ once: true }
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

timeoutSignal.addEventListener(
"abort",
() => {
if (internalController.signal.aborted) return;
log(`timeoutSignal aborted: ${timeoutSignal.reason}`);
internalController.abort("Timeout");
},
{ once: true, signal: internalController.signal }
);
// The request could have been aborted during `await handler(context)` above.
// AbortSignal listeners added after the signal is already aborted never fire,
// so invoke cleanup synchronously in that case instead of waiting for `timeout`.
if (requestAbortSignal.aborted) {
onRequestAbort();
} else {
requestAbortSignal.addEventListener("abort", onRequestAbort, { once: true });
}

if (handlers.beforeStream) {
const shouldContinue = await handlers.beforeStream();
if (shouldContinue === false) {
log("beforeStream returned false, so we'll exit before creating the stream");
internalController.abort("Init requested stop");
internalController.abort(ABORT_REASON_INIT_STOP);
return;
}
}

return eventStream(combinedSignal, function setup(send) {
return eventStream(internalController.signal, function setup(send) {
connections.add(id);
const safeSend = createSafeSend(send);

Expand All @@ -147,14 +170,14 @@ export function createSSELoader(options: SSEOptions) {
const shouldContinue = await handlers.initStream({ send: safeSend });
if (shouldContinue === false) {
log("initStream returned false, so we'll stop the stream");
internalController.abort("Init requested stop");
internalController.abort(ABORT_REASON_INIT_STOP);
return;
}
}

log("Starting interval");
for await (const _ of setInterval(interval, null, {
signal: combinedSignal,
signal: internalController.signal,
})) {
log("PING");

Expand All @@ -165,13 +188,16 @@ export function createSSELoader(options: SSEOptions) {
const shouldContinue = await handlers.iterator({ date, send: safeSend });
if (shouldContinue === false) {
log("iterator return false, so we'll stop the stream");
internalController.abort("Iterator requested stop");
internalController.abort(ABORT_REASON_ITERATOR_STOP);
break;
}
} catch (error) {
log("iterator threw an error, aborting stream");
// Immediately abort to trigger cleanup
internalController.abort(error instanceof Error ? error.message : "Iterator error");
if (error instanceof Error && error.name !== "AbortError") {
log(`iterator error: ${error.message}`);
}
internalController.abort(ABORT_REASON_ITERATOR_ERROR);
// No need to re-throw as we're handling it by aborting
return; // Exit the run function immediately
}
Expand Down
6 changes: 4 additions & 2 deletions apps/webapp/app/v3/tracer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,15 @@ function setupTelemetry() {
provider.register();

let instrumentations: Instrumentation[] = [
new HttpInstrumentation(),
new ExpressInstrumentation(),
new AwsSdkInstrumentation({
suppressInternalInstrumentation: true,
}),
];

if (!env.DISABLE_HTTP_INSTRUMENTATION) {
instrumentations.unshift(new HttpInstrumentation(), new ExpressInstrumentation());
}

if (env.INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED === "1") {
instrumentations.push(new PrismaInstrumentation());
}
Expand Down
Loading