From 04b59cb468a993aad2fe37dec768e70b1f785fcd Mon Sep 17 00:00:00 2001 From: tcheeric Date: Fri, 8 May 2026 00:26:27 +0100 Subject: [PATCH 1/6] fix(ws): wrap clientSession in ConcurrentWebSocketSessionDecorator to serialise concurrent sends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NostrRelayClient.subscribe() called clientSession.sendMessage(...) without holding the existing sendLock, while send() held it correctly. Two threads racing inside subscribe() (or one in subscribe() racing one in send()) could trigger StandardWebSocketSession's IllegalStateException("The remote endpoint was in state [TEXT_FULL_WRITING]") (Tomcat) or "Blocking message pending" (Jetty). The catch (RuntimeException) block at the bottom of subscribe() rewrapped the exception as IOException("Failed to send subscription payload", e), which the imani-gateway-core consumer then surfaced as ~70 RelaySubscribeException per 60 minutes once a circuit-breaker tuning fix unmasked the underlying race. Fixed by wrapping the underlying WebSocketSession returned by connectSession() in Spring's ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE) at construction time, so concurrent sendMessage() calls from any send-path are serialised at the session layer. The decorator is idiomatic Spring and is future-proof against new send-paths added later (e.g. heartbeat/ping) that would otherwise have to remember to acquire sendLock. Sized: 256 KiB buffer (~2.5x the largest expected payload, a chunked kind-37375 EVENT at ~100 KiB), 10 s send time limit (above any p99.9 healthy-state subscribe REQ, well below the existing awaitTimeoutMs=60 s). Both tunable via @Value-injected constructor parameters (nostr.websocket.send-buffer-limit, nostr.websocket.send-time-limit-ms) or matching env-vars (NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT, NOSTR_WEBSOCKET_SEND_TIME_LIMIT_MS) under Spring relaxed binding. OverflowStrategy is explicitly TERMINATE, never DROP — dropping outbound REQ/EVENT frames silently would make callers believe a subscribe/publish succeeded when the relay never received it. Constructor binding: - The new four-arg public constructor (relayUri, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs) is annotated @Autowired so Spring's prototype-scoped @Component selection is deterministic. - All existing public constructors (one-arg and two-arg) are retained as delegating overloads (binary-compat) and now also benefit from the decorator wrap by way of the canonical ctor. - A new package-private four-arg test ctor + two named static factories (forTestWithRawSession, forTestWithDecoratedSession) make the wrap-or-not decision explicit at the call site, so the new NostrRelayClientConcurrencyTest can deterministically reproduce the race against a raw session and verify the fix on a decorated one. - awaitTimeoutMs is now a final field assigned via constructor injection (previously @Value-annotated field). Constructor-injection ordering guarantees the value reaches the constructor body before the decorator is constructed. New test class NostrRelayClientConcurrencyTest covers: - (7a) Reproduction: 32 concurrent subscribe() calls against a raw session reproduce the IllegalStateException race rewrap. - (7b) Resolution: same harness against a decorated session sees zero failures and maxInFlight==1; field-injection regression assertions on the new ctor parameters. - (7c) Mixed-workload: a 100 KiB send racing with 20 subscribes under the 256 KiB default buffer completes without overflow. - (7d) Overflow-and-close: a tiny 256-byte buffer is forced to overflow; SessionLimitExceededException (rewrapped as IOException("Failed to send subscription payload", …)) propagates to the caller. Note: the spec's strict "session must be closed after overflow" assertion is relaxed here because Spring's ConcurrentWebSocketSessionDecorator does not close the delegate from limitExceeded() — the close-on-overflow chain is the upstream imani-wallet-lib's responsibility (§6.7e in the spec). - Constructor validation tests for non-positive sendBufferLimit / sendTimeLimitMs / awaitTimeoutMs and null session. Spec: imani-apps/docs/analysis-gateway-core-relay-subscribe-instability-2026-05-07.md Phase: 1 of 6 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../springwebsocket/NostrRelayClient.java | 205 +++++++- .../NostrRelayClientConcurrencyTest.java | 469 ++++++++++++++++++ 2 files changed, 666 insertions(+), 8 deletions(-) create mode 100644 nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java diff --git a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java index 4c893991..f2e84364 100644 --- a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java +++ b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java @@ -11,8 +11,10 @@ import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHttpHeaders; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.standard.StandardWebSocketClient; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import org.springframework.web.socket.handler.TextWebSocketHandler; import jakarta.websocket.ContainerProvider; @@ -53,6 +55,26 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea private static final int DEFAULT_MAX_TEXT_MESSAGE_BUFFER_SIZE = 1048576; private static final int DEFAULT_MAX_BINARY_MESSAGE_BUFFER_SIZE = 1048576; private static final int DEFAULT_MAX_EVENTS_PER_REQUEST = 10_000; + /** + * Default {@code ConcurrentWebSocketSessionDecorator} send-buffer size — 256 KiB. + * + *

Sized at ~2.5× the largest expected single payload (a chunked kind-37375 + * EVENT can reach ~100 KiB) so that a large {@code send()} co-existing with + * a burst of subscribe REQs does not overflow the decorator's buffer. Tunable + * via {@code nostr.websocket.send-buffer-limit} ({@code @Value}) or + * {@code NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT} (env-var, via Spring relaxed + * binding). + */ + private static final int DEFAULT_SEND_BUFFER_LIMIT = 256 * 1024; + /** + * Default {@code ConcurrentWebSocketSessionDecorator} per-send time limit — 10 s. + * + *

Above any p99.9 healthy-state subscribe REQ ({@literal <}100 ms) and well + * below {@link #DEFAULT_AWAIT_TIMEOUT_MS} so the writer-side timeout fires + * first if the underlying writer is genuinely stuck. Tunable via + * {@code nostr.websocket.send-time-limit-ms}. + */ + private static final int DEFAULT_SEND_TIME_LIMIT_MS = 10_000; private static final ThreadFactory RELAY_IO_THREAD_FACTORY = Thread.ofVirtual().name("nostr-relay-io-", 0).factory(); private static final ThreadFactory LISTENER_THREAD_FACTORY = @@ -62,8 +84,7 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea private static final Executor LISTENER_EXECUTOR = command -> LISTENER_THREAD_FACTORY.newThread(command).start(); - @Value("${nostr.websocket.await-timeout-ms:60000}") - private long awaitTimeoutMs; + private final long awaitTimeoutMs; @Value("${nostr.websocket.max-idle-timeout-ms:3600000}") private long maxIdleTimeoutMs; @@ -71,6 +92,20 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea @Value("${nostr.websocket.max-events-per-request:10000}") private int maxEventsPerRequest = DEFAULT_MAX_EVENTS_PER_REQUEST; + /** + * Decorator buffer-size limit captured at construction. Exposed as a + * package-private field so unit tests can assert that constructor arguments + * are reflected here rather than overridden by static defaults (the + * field-injection regression assertion described in the spec). + */ + private final int sendBufferLimit; + + /** + * Decorator per-send time-limit captured at construction. See + * {@link #sendBufferLimit} for the regression-assertion rationale. + */ + private final int sendTimeLimitMs; + private final WebSocketSession clientSession; /** * Relay URI captured at construction time. Used for logging so that @@ -124,34 +159,130 @@ int eventCount() { } } + /** + * Back-compat constructor — delegates to the canonical four-arg constructor + * with default await-timeout / send-buffer-limit / send-time-limit. Retained + * for direct {@code new}-callers (e.g. the {@code connectAsync(String)} + * factory). Spring will not select this overload because the + * canonical four-arg constructor is annotated {@link Autowired}. + */ public NostrRelayClient(@Value("${nostr.relay.uri}") String relayUri) throws java.util.concurrent.ExecutionException, InterruptedException { - this.relayUri = relayUri; - this.clientSession = connectSession(relayUri); - connectionState.set(ConnectionState.CONNECTED); + this(relayUri, DEFAULT_AWAIT_TIMEOUT_MS, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); } + /** + * Back-compat constructor — delegates to the canonical four-arg constructor + * with default send-buffer-limit / send-time-limit. Retained for direct + * {@code new}-callers (e.g. the {@code connectAsync(String, long)} factory + * and the {@link NostrRelayClient(WebSocketSession, long)} test factory). + */ public NostrRelayClient(String relayUri, long awaitTimeoutMs) throws java.util.concurrent.ExecutionException, InterruptedException { + this(relayUri, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Canonical constructor — the one Spring autowires against. Wraps the + * underlying {@link WebSocketSession} returned by {@link #connectSession} + * in a {@link ConcurrentWebSocketSessionDecorator} so concurrent + * {@code sendMessage()} calls from {@code subscribe()} and {@code send()} + * are serialised. Without the wrap, two threads racing inside + * {@code subscribe()} can trigger a Tomcat + * {@code IllegalStateException("The remote endpoint was in state + * [TEXT_FULL_WRITING]")} (or the Jetty / Undertow equivalent) which is + * caught by the existing {@code catch (RuntimeException)} block at the + * bottom of {@code subscribe()} and rewrapped as + * {@code IOException("Failed to send subscription payload", …)}. + * + *

The decorator is constructed with + * {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy#TERMINATE} + * (made explicit rather than relying on the default): if the buffer fills, + * the underlying session is closed, the calling thread sees a + * {@code SessionLimitExceededException}, and the next call lands on the + * reconnect path in the upstream {@code NostrJavaRelayClient} caller. + * {@code OverflowStrategy.DROP} is explicitly avoided because dropping + * outbound REQ / EVENT frames would silently make callers believe a + * subscribe / publish succeeded when the relay never received it. + */ + @Autowired + public NostrRelayClient( + @Value("${nostr.relay.uri}") String relayUri, + @Value("${nostr.websocket.await-timeout-ms:60000}") long awaitTimeoutMs, + @Value("${nostr.websocket.send-buffer-limit:262144}") int sendBufferLimit, + @Value("${nostr.websocket.send-time-limit-ms:10000}") int sendTimeLimitMs) + throws java.util.concurrent.ExecutionException, InterruptedException { if (awaitTimeoutMs <= 0) { throw new IllegalArgumentException("awaitTimeoutMs must be positive"); } - this.awaitTimeoutMs = awaitTimeoutMs; + if (sendBufferLimit <= 0) { + throw new IllegalArgumentException("sendBufferLimit must be positive"); + } + if (sendTimeLimitMs <= 0) { + throw new IllegalArgumentException("sendTimeLimitMs must be positive"); + } this.relayUri = relayUri; - log.info("NostrRelayClient created for {} with awaitTimeoutMs={}", relayUri, awaitTimeoutMs); - this.clientSession = connectSession(relayUri); + this.awaitTimeoutMs = awaitTimeoutMs; + this.sendBufferLimit = sendBufferLimit; + this.sendTimeLimitMs = sendTimeLimitMs; + log.info( + "NostrRelayClient created for {} with awaitTimeoutMs={} sendBufferLimit={} sendTimeLimitMs={}", + relayUri, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs); + WebSocketSession raw = connectSession(relayUri); + this.clientSession = + new ConcurrentWebSocketSessionDecorator( + raw, sendTimeLimitMs, sendBufferLimit, + ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE); connectionState.set(ConnectionState.CONNECTED); } + /** + * Test-only constructor — the supplied {@link WebSocketSession} is stored + * as-is; no decorator wrap is applied. Production code paths must + * not reach this constructor — they go through the public + * {@code (String, …)} constructors that perform the wrap. + * + *

The §6.7a reproduction step (regression test for the concurrent-send + * race) needs a raw, unwrapped session to deterministically reproduce the + * {@code IllegalStateException}. Use + * {@link #forTestWithRawSession(WebSocketSession, long)} for that path. The + * §6.7b–d resolution steps wrap the session via + * {@link #forTestWithDecoratedSession(WebSocketSession, long)} and then + * reach this constructor with the wrapped session as the argument. + */ NostrRelayClient(WebSocketSession clientSession, long awaitTimeoutMs) { + this(clientSession, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Test-only constructor — four-arg form taking explicit decorator parameters + * so unit tests can assert that the constructor arguments are reflected in + * {@link #sendBufferLimit} and {@link #sendTimeLimitMs} (the field-injection + * regression assertion described in spec §4.1). The supplied session is + * stored as-is — wrapping is the caller's responsibility, see + * {@link #forTestWithDecoratedSession(WebSocketSession, long, int, int)}. + */ + NostrRelayClient( + WebSocketSession clientSession, + long awaitTimeoutMs, + int sendBufferLimit, + int sendTimeLimitMs) { if (clientSession == null) { throw new NullPointerException("clientSession must not be null"); } if (awaitTimeoutMs <= 0) { throw new IllegalArgumentException("awaitTimeoutMs must be positive"); } + if (sendBufferLimit <= 0) { + throw new IllegalArgumentException("sendBufferLimit must be positive"); + } + if (sendTimeLimitMs <= 0) { + throw new IllegalArgumentException("sendTimeLimitMs must be positive"); + } this.clientSession = clientSession; this.awaitTimeoutMs = awaitTimeoutMs; + this.sendBufferLimit = sendBufferLimit; + this.sendTimeLimitMs = sendTimeLimitMs; URI sessionUri = null; try { sessionUri = clientSession.getUri(); @@ -163,6 +294,64 @@ public NostrRelayClient(String relayUri, long awaitTimeoutMs) connectionState.set(ConnectionState.CONNECTED); } + /** + * Test-only factory — REGRESSION reproduction path. Bypasses the decorator + * so the §6.7a reproduction can deterministically reproduce the + * concurrent-send {@code IllegalStateException} against a raw session. + * Not used by production code. + */ + static NostrRelayClient forTestWithRawSession(WebSocketSession session, long awaitTimeoutMs) { + return new NostrRelayClient( + session, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Test-only factory — production path with default buffer / time-limit. + * Wraps the supplied session in a {@link ConcurrentWebSocketSessionDecorator} + * (using static defaults) and forwards through the package-private four-arg + * test ctor. + */ + static NostrRelayClient forTestWithDecoratedSession( + WebSocketSession session, long awaitTimeoutMs) { + return forTestWithDecoratedSession( + session, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Test-only factory — production path with custom buffer / time-limit. Used + * by the §6.7c (mixed-workload, default buffer) and §6.7d (overflow-and-close, + * small buffer) sub-cases. + */ + static NostrRelayClient forTestWithDecoratedSession( + WebSocketSession session, + long awaitTimeoutMs, + int sendBufferLimit, + int sendTimeLimitMs) { + WebSocketSession wrapped = + (session instanceof ConcurrentWebSocketSessionDecorator) + ? session + : new ConcurrentWebSocketSessionDecorator( + session, sendTimeLimitMs, sendBufferLimit, + ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE); + return new NostrRelayClient(wrapped, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs); + } + + /** + * Package-private accessor — exposes {@link #sendBufferLimit} for the + * field-injection regression unit test (§6.7b assertion (iv)). + */ + int sendBufferLimitForTest() { + return sendBufferLimit; + } + + /** + * Package-private accessor — exposes {@link #sendTimeLimitMs} for the + * field-injection regression unit test (§6.7b assertion (iv)). + */ + int sendTimeLimitMsForTest() { + return sendTimeLimitMs; + } + /** * Connect to a relay asynchronously on a Virtual Thread. * diff --git a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java new file mode 100644 index 00000000..5b6f2a7d --- /dev/null +++ b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java @@ -0,0 +1,469 @@ +package nostr.client.springwebsocket; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; + +/** + * Concurrency regression tests for the {@code subscribe()} send-side race + * fixed by wrapping {@code clientSession} in a + * {@code ConcurrentWebSocketSessionDecorator}. + * + *

Structured as fail-first / pass-after per the spec + * acceptance criteria (§6.7): + * + *

+ * + *

Constructor-validation tests cover positive-value enforcement on the + * package-private four-arg test ctor. + */ +class NostrRelayClientConcurrencyTest { + + private static final long TEST_AWAIT_TIMEOUT_MS = 5_000L; + private static final int CONCURRENCY = 32; + private static final int CONCURRENCY_LIGHT = 20; + private static final int BLOCK_MS = 50; + private static final int RESOLUTION_DEADLINE_MS = 5_000; + private static final int MIXED_WORKLOAD_DEADLINE_MS = 10_000; + + // ---- §6.7a: reproduction step against a raw (non-decorated) session ---- + @Test + void rawSession_concurrentSubscribe_reproducesTextFullWritingRace() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + WebSocketSession raw = newBlockingMockSession(inFlight, maxInFlight, isOpen, BLOCK_MS, + /* throwOnConcurrent= */ true); + + NostrRelayClient client = NostrRelayClient.forTestWithRawSession(raw, TEST_AWAIT_TIMEOUT_MS); + + List failures = runConcurrentSubscribes(client, CONCURRENCY); + + long ioWithIllegalStateCause = failures.stream() + .filter(t -> t instanceof IOException) + .filter(t -> "Failed to send subscription payload".equals(t.getMessage())) + .filter(t -> t.getCause() instanceof IllegalStateException) + .count(); + assertTrue( + ioWithIllegalStateCause >= 1, + "Expected at least one IOException(\"Failed to send subscription payload\") with cause " + + "IllegalStateException to reproduce the concurrent-send race against a raw session " + + "(observed " + ioWithIllegalStateCause + " out of " + failures.size() + " failures)"); + } + + // ---- §6.7b: resolution step against a decorated session ---- + @Test + void decoratedSession_concurrentSubscribe_serialisesAndSucceeds() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + WebSocketSession raw = newBlockingMockSession(inFlight, maxInFlight, isOpen, BLOCK_MS, + /* throwOnConcurrent= */ true); + + NostrRelayClient client = + NostrRelayClient.forTestWithDecoratedSession(raw, TEST_AWAIT_TIMEOUT_MS); + + long start = System.nanoTime(); + List failures = runConcurrentSubscribes(client, CONCURRENCY); + long elapsedMs = (System.nanoTime() - start) / 1_000_000L; + + assertTrue(failures.isEmpty(), + "Expected zero failures with the decorator, observed " + failures.size() + + ": first=" + (failures.isEmpty() ? "" : failures.get(0))); + assertEquals(1, maxInFlight.get(), + "Decorator must serialise sends — maxInFlight should be exactly 1, was " + + maxInFlight.get()); + assertTrue(elapsedMs < RESOLUTION_DEADLINE_MS * 2L, + "Resolution harness exceeded " + (RESOLUTION_DEADLINE_MS * 2L) + + "ms wall-clock (was " + elapsedMs + "ms)"); + + // (iv) field-injection regression — constructor args reflected, not static defaults. + int defaultsBuffer = NostrRelayClient.forTestWithDecoratedSession( + Mockito.mock(WebSocketSession.class), TEST_AWAIT_TIMEOUT_MS).sendBufferLimitForTest(); + assertEquals(256 * 1024, defaultsBuffer, + "Two-arg test factory must use the 256 KiB default buffer"); + int customBuffer = NostrRelayClient.forTestWithDecoratedSession( + Mockito.mock(WebSocketSession.class), TEST_AWAIT_TIMEOUT_MS, 8 * 1024, 7_500) + .sendBufferLimitForTest(); + assertEquals(8 * 1024, customBuffer, + "Four-arg test factory must reflect the constructor argument, not the static default"); + int customTimeLimit = NostrRelayClient.forTestWithDecoratedSession( + Mockito.mock(WebSocketSession.class), TEST_AWAIT_TIMEOUT_MS, 8 * 1024, 7_500) + .sendTimeLimitMsForTest(); + assertEquals(7_500, customTimeLimit, + "Four-arg test factory must reflect the sendTimeLimitMs constructor argument"); + } + + // ---- §6.7c: mixed-workload (100 KiB send racing with subscribes) ---- + @Test + void decoratedSession_mixedWorkload_largeSendWithSubscribesUnderDefaultBuffer() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + // Light blocking on subscribes; the large send is simulated by a separate + // 200 ms blocking branch keyed on payload size. + WebSocketSession raw = newSizeAwareMockSession(inFlight, maxInFlight, isOpen, + /* normalBlockMs= */ BLOCK_MS, /* largeBlockMs= */ 200, + /* largeThresholdBytes= */ 64 * 1024); + + NostrRelayClient client = NostrRelayClient.forTestWithDecoratedSession( + raw, TEST_AWAIT_TIMEOUT_MS, + /* sendBufferLimit= */ 256 * 1024, /* sendTimeLimitMs= */ 10_000); + + String largePayload = makePayload(100 * 1024); + int totalCalls = CONCURRENCY_LIGHT + 1; + List failures = new ArrayList<>(); + ExecutorService pool = Executors.newFixedThreadPool(totalCalls); + CountDownLatch startingGun = new CountDownLatch(1); + + long start = System.nanoTime(); + for (int i = 0; i < CONCURRENCY_LIGHT; i++) { + final int idx = i; + pool.submit(() -> { + try { + startingGun.await(); + client.subscribe("[\"REQ\",\"sub-" + idx + "\"]", + ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + } + pool.submit(() -> { + try { + startingGun.await(); + // The size-aware mock branches to a 200 ms slow-flush path on this + // payload — simulating a chunked-EVENT publish racing the subscribes. + client.subscribe(largePayload, ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + startingGun.countDown(); + pool.shutdown(); + boolean done = pool.awaitTermination(MIXED_WORKLOAD_DEADLINE_MS + 5_000, TimeUnit.MILLISECONDS); + long elapsedMs = (System.nanoTime() - start) / 1_000_000L; + + assertTrue(done, "Mixed-workload pool did not terminate within " + + (MIXED_WORKLOAD_DEADLINE_MS + 5_000) + "ms"); + assertTrue(failures.isEmpty(), + "Mixed workload failures: " + failures.size() + + " — first=" + (failures.isEmpty() ? "" : failures.get(0))); + assertEquals(1, maxInFlight.get(), + "Decorator must serialise the large send + 20 subscribes — maxInFlight was " + + maxInFlight.get()); + assertTrue(elapsedMs < MIXED_WORKLOAD_DEADLINE_MS, + "Mixed workload exceeded " + MIXED_WORKLOAD_DEADLINE_MS + "ms wall-clock (was " + + elapsedMs + "ms)"); + } + + // ---- §6.7d: overflow-and-close ---- + @Test + void decoratedSession_overflow_closesUnderlyingSession() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + AtomicBoolean unblock = new AtomicBoolean(false); + WebSocketSession raw = newIndefinitelyBlockingMockSession(inFlight, maxInFlight, isOpen, + unblock); + + // Tiny 256-byte buffer with 1 KiB-sized subscribes — overflow guaranteed. + NostrRelayClient client = NostrRelayClient.forTestWithDecoratedSession( + raw, TEST_AWAIT_TIMEOUT_MS, + /* sendBufferLimit= */ 256, /* sendTimeLimitMs= */ 10_000); + + String oneKib = makePayload(1024); + int n = 50; + List failures = new ArrayList<>(); + ExecutorService pool = Executors.newFixedThreadPool(n); + CountDownLatch startingGun = new CountDownLatch(1); + for (int i = 0; i < n; i++) { + pool.submit(() -> { + try { + startingGun.await(); + client.subscribe(oneKib, ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + } + startingGun.countDown(); + + // Wait for the decorator to overflow and close the session. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (System.nanoTime() < deadline && isOpen.get()) { + Thread.sleep(50); + } + // Release the indefinite-blockers so the pool can drain. + unblock.set(true); + synchronized (raw) { raw.notifyAll(); } + pool.shutdown(); + pool.awaitTermination(10, TimeUnit.SECONDS); + + assertTrue(failures.size() >= 1, + "Expected at least one overflow exception under TERMINATE (observed " + + failures.size() + ")"); + + // Asserting (i) above; (ii) — spec §6.7d ii says the underlying session + // must be closed — is intentionally relaxed here. Spring's + // ConcurrentWebSocketSessionDecorator under TERMINATE does NOT close the + // delegate from limitExceeded() (it sets a private `limitExceeded` flag + // and throws SessionLimitExceededException; the delegate is closed only + // when somebody subsequently invokes the decorator's close()). The spec's + // assumption that overflow alone closes the session is incorrect for + // bare Spring; the closure happens upstream when NostrJavaRelayClient (or + // the application's MessageBrokerWebSocketHandler) handles the exception. + // We therefore assert the propagated overflow exception only, and leave + // the close-on-overflow chain to the upstream §6.7e wallet-lib test. + boolean overflowExceptionPropagated = failures.stream().anyMatch(t -> { + String msg = t.getMessage(); + if (msg != null && msg.contains("SessionLimitExceeded")) return true; + // The wrapped form: NostrRelayClient.subscribe() catches RuntimeException + // (which SessionLimitExceededException extends) and rewraps as + // IOException("Failed to send subscription payload", e). + if (t instanceof IOException + && "Failed to send subscription payload".equals(t.getMessage()) + && t.getCause() != null + && t.getCause().getClass().getName().endsWith("SessionLimitExceededException")) { + return true; + } + return false; + }); + assertTrue(overflowExceptionPropagated, + "Expected at least one SessionLimitExceededException (or its rewrap) to propagate " + + "to the caller; observed failures: " + failures); + } + + // ---- Constructor validation ---- + @Test + void packagePrivateFourArgCtor_rejectsNonPositiveBufferLimit() { + WebSocketSession s = Mockito.mock(WebSocketSession.class); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, 0, 1_000)); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, -1, 1_000)); + } + + @Test + void packagePrivateFourArgCtor_rejectsNonPositiveTimeLimit() { + WebSocketSession s = Mockito.mock(WebSocketSession.class); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, 1_024, 0)); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, 1_024, -5)); + } + + @Test + void packagePrivateFourArgCtor_rejectsNonPositiveAwaitTimeout() { + WebSocketSession s = Mockito.mock(WebSocketSession.class); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, 0, 1_024, 1_000)); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, -1, 1_024, 1_000)); + } + + @Test + void packagePrivateFourArgCtor_rejectsNullSession() { + assertThrows(NullPointerException.class, + () -> new NostrRelayClient((WebSocketSession) null, TEST_AWAIT_TIMEOUT_MS, 1_024, 1_000)); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + /** + * Build a mock session whose {@code sendMessage} blocks for {@code blockMs} + * milliseconds while incrementing an in-flight counter. If + * {@code throwOnConcurrent} is set, on entry to {@code sendMessage} when the + * counter rises above 1 the mock throws + * {@code IllegalStateException("simulated TEXT_FULL_WRITING")} — emulating + * Tomcat / Jetty's real-world thread-safety violation. + */ + private static WebSocketSession newBlockingMockSession( + AtomicInteger inFlight, + AtomicInteger maxInFlight, + AtomicBoolean isOpen, + int blockMs, + boolean throwOnConcurrent) { + WebSocketSession session = Mockito.mock(WebSocketSession.class); + Mockito.when(session.isOpen()).thenAnswer(inv -> isOpen.get()); + Answer sendAnswer = invocation -> { + int now = inFlight.incrementAndGet(); + try { + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + if (throwOnConcurrent && now > 1) { + throw new IllegalStateException("simulated TEXT_FULL_WRITING"); + } + if (blockMs > 0) { + Thread.sleep(blockMs); + } + } finally { + inFlight.decrementAndGet(); + } + return null; + }; + try { + Mockito.doAnswer(sendAnswer).when(session).sendMessage(any(TextMessage.class)); + } catch (IOException impossible) { + throw new AssertionError(impossible); + } + return session; + } + + /** + * Build a mock whose {@code sendMessage} blocks indefinitely (for the + * §6.7d overflow test). The {@code unblock} flag releases pending sends. + * Crucially, when the test calls + * {@link WebSocketSession#close(CloseStatus)} (which the decorator does on + * TERMINATE), {@code isOpen()} pivots to false. + */ + private static WebSocketSession newIndefinitelyBlockingMockSession( + AtomicInteger inFlight, + AtomicInteger maxInFlight, + AtomicBoolean isOpen, + AtomicBoolean unblock) { + WebSocketSession session = Mockito.mock(WebSocketSession.class); + Mockito.when(session.isOpen()).thenAnswer(inv -> isOpen.get()); + Answer closeAnswer = invocation -> { + isOpen.set(false); + synchronized (session) { session.notifyAll(); } + return null; + }; + Answer sendAnswer = invocation -> { + int now = inFlight.incrementAndGet(); + try { + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + synchronized (session) { + while (!unblock.get() && isOpen.get()) { + session.wait(60_000); + // Loop guard against spurious wakeups; one iteration is enough — we + // either get notified by close()/notifyAll() or by the test's + // unblock-pivot. + break; + } + } + if (!isOpen.get()) { + throw new IOException("simulated session-closed mid-send"); + } + } finally { + inFlight.decrementAndGet(); + } + return null; + }; + try { + Mockito.doAnswer(sendAnswer).when(session).sendMessage(any(TextMessage.class)); + Mockito.doAnswer(closeAnswer).when(session).close(); + Mockito.doAnswer(closeAnswer).when(session).close(any(CloseStatus.class)); + } catch (IOException impossible) { + throw new AssertionError(impossible); + } + return session; + } + + /** + * Build a mock whose {@code sendMessage} blocks for {@code largeBlockMs} + * if the payload exceeds {@code largeThresholdBytes}, otherwise for + * {@code normalBlockMs}. Used by the §6.7c mixed-workload test. + */ + private static WebSocketSession newSizeAwareMockSession( + AtomicInteger inFlight, + AtomicInteger maxInFlight, + AtomicBoolean isOpen, + int normalBlockMs, + int largeBlockMs, + int largeThresholdBytes) { + WebSocketSession session = Mockito.mock(WebSocketSession.class); + Mockito.when(session.isOpen()).thenAnswer(inv -> isOpen.get()); + Answer sendAnswer = (InvocationOnMock invocation) -> { + int now = inFlight.incrementAndGet(); + try { + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + TextMessage msg = invocation.getArgument(0); + int sleepMs = msg.getPayloadLength() >= largeThresholdBytes + ? largeBlockMs : normalBlockMs; + Thread.sleep(sleepMs); + } finally { + inFlight.decrementAndGet(); + } + return null; + }; + try { + Mockito.doAnswer(sendAnswer).when(session).sendMessage(any(TextMessage.class)); + } catch (IOException impossible) { + throw new AssertionError(impossible); + } + return session; + } + + /** + * Run {@code n} concurrent {@code subscribe()} calls against the supplied + * client, all released by a single starting-gun, and return the list of + * exceptions raised. Caller must inspect the list — the worker pool is + * always awaited (15 s deadline) before this method returns. + */ + private static List runConcurrentSubscribes( + NostrRelayClient client, int n) throws InterruptedException { + List failures = new ArrayList<>(); + ExecutorService pool = Executors.newFixedThreadPool(n); + CountDownLatch startingGun = new CountDownLatch(1); + for (int i = 0; i < n; i++) { + final int idx = i; + pool.submit(() -> { + try { + startingGun.await(); + client.subscribe("[\"REQ\",\"sub-" + idx + "\"]", + ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + } + startingGun.countDown(); + pool.shutdown(); + boolean done = pool.awaitTermination(15, TimeUnit.SECONDS); + assertTrue(done, "Worker pool did not terminate within 15 s " + + "(failures so far: " + failures.size() + ")"); + return failures; + } + + private static String makePayload(int sizeBytes) { + StringBuilder sb = new StringBuilder(sizeBytes + 32); + sb.append("[\"REQ\",\"x\",\""); + while (sb.length() < sizeBytes - 2) { + sb.append('a'); + } + sb.append("\"]"); + return sb.toString(); + } +} From 030877f748b1587931b34c660c7d84f9f672c6f6 Mon Sep 17 00:00:00 2001 From: tcheeric Date: Fri, 8 May 2026 00:26:33 +0100 Subject: [PATCH 2/6] chore(release): bump version to 2.0.2 Includes the concurrent-subscribe-send fix from the previous commit. Co-Authored-By: Claude Opus 4.6 (1M context) --- nostr-java-client/pom.xml | 2 +- nostr-java-core/pom.xml | 2 +- nostr-java-event/pom.xml | 2 +- nostr-java-identity/pom.xml | 2 +- pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nostr-java-client/pom.xml b/nostr-java-client/pom.xml index 07aab89b..b339d77d 100644 --- a/nostr-java-client/pom.xml +++ b/nostr-java-client/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.2 ../pom.xml diff --git a/nostr-java-core/pom.xml b/nostr-java-core/pom.xml index 028f7cc7..0ab6627c 100644 --- a/nostr-java-core/pom.xml +++ b/nostr-java-core/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.2 ../pom.xml diff --git a/nostr-java-event/pom.xml b/nostr-java-event/pom.xml index 63139968..921fcca6 100644 --- a/nostr-java-event/pom.xml +++ b/nostr-java-event/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.2 ../pom.xml diff --git a/nostr-java-identity/pom.xml b/nostr-java-identity/pom.xml index 68f41a56..b31aec36 100644 --- a/nostr-java-identity/pom.xml +++ b/nostr-java-identity/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.2 ../pom.xml diff --git a/pom.xml b/pom.xml index 2e2d494e..d91854ff 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.2 pom nostr-java From e10ac66fc14f5740240114f6a91daeb6c1578548 Mon Sep 17 00:00:00 2001 From: tcheeric Date: Fri, 8 May 2026 00:26:36 +0100 Subject: [PATCH 3/6] docs(changelog): add 2.0.2 entry for concurrent-subscribe-send fix Co-Authored-By: Claude Opus 4.6 (1M context) --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9710cca3..6f575a7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,18 @@ The format is inspired by Keep a Changelog, and this project adheres to semantic - `BaseKey` now directly implements `Serializable` (previously implemented the now-deleted `IKey` interface). - `Nip05Validator` now creates `HttpClient` instances directly via a `Function` factory (previously used deleted `HttpClientProvider`/`DefaultHttpClientProvider` interface). +## [2.0.2] - 2026-05-08 + +### Fixed +- `NostrRelayClient.subscribe()` was calling `clientSession.sendMessage(...)` without holding the existing `sendLock`, while the same class's `send()` path correctly held it. Two threads racing inside `subscribe()` (or one in `subscribe()` racing one in `send()`) could trigger the underlying writer's `IllegalStateException("The remote endpoint was in state [TEXT_FULL_WRITING]")` (Tomcat / Spring `StandardWebSocketSession`) or `Blocking message pending` (Jetty), which the `catch (RuntimeException)` block at the bottom of `subscribe()` rewrapped as `IOException("Failed to send subscription payload", e)`. Downstream consumer (`imani-gateway-core` `account-app`) observed ~70 `RelaySubscribeException` per 60 minutes once a circuit-breaker tuning fix unmasked the underlying race. Fixed by wrapping the underlying `WebSocketSession` returned by `connectSession(...)` in Spring's `ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE)` at construction time, so concurrent `sendMessage()` calls from any send-path are serialised at the session layer. + +### Added +- New canonical four-arg public constructor `NostrRelayClient(String relayUri, long awaitTimeoutMs, int sendBufferLimit, int sendTimeLimitMs)` annotated `@Autowired` for Spring constructor-injection. Spring binds against this overload via the new `@Value("${nostr.websocket.send-buffer-limit:262144}")` and `@Value("${nostr.websocket.send-time-limit-ms:10000}")` keys (also reachable via the `NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT` / `NOSTR_WEBSOCKET_SEND_TIME_LIMIT_MS` env-vars under Spring relaxed binding). The previously-existing one-arg `(String)` and two-arg `(String, long)` public constructors are retained as delegating overloads (binary-compatible) and now also benefit from the decorator wrap by way of the canonical ctor. +- Test-only static factories `forTestWithRawSession(...)` and `forTestWithDecoratedSession(...)` (two overloads — defaults and explicit) plus a package-private four-arg test constructor, so the new `NostrRelayClientConcurrencyTest` can assert both the regression (raw-session reproduction of the `IllegalStateException` race) and the resolution (decorator-wrapped session serialises sends). + +### Changed +- `awaitTimeoutMs` is now a `final` field assigned once via constructor injection (previously `@Value`-annotated field). Spring's constructor-injection ordering guarantees the value reaches the constructor body before the decorator is constructed, which is required for the explicit overflow strategy to be applied. The system property / env-var key (`nostr.websocket.await-timeout-ms`) and its default (60 000 ms) are unchanged. + ## [2.0.1] - 2026-05-06 ### Fixed From b4b6d8355176a6407e988b4290d02a77c71f0b21 Mon Sep 17 00:00:00 2001 From: tcheeric Date: Fri, 8 May 2026 05:59:22 +0100 Subject: [PATCH 4/6] fix(ws): explicitly close session on SessionLimitExceededException to restore isOpen()==false reconnect contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spring's ConcurrentWebSocketSessionDecorator under OverflowStrategy.TERMINATE only sets a private limitExceeded flag and throws SessionLimitExceededException from limitExceeded() — it does NOT close the delegate session. As a result, 2.0.2's wrap-only fix left the session open after overflow, breaking the upstream caller's clientSession.isOpen()==false → reconnect contract that the spec's §6.7d ii originally required. NostrRelayClient.subscribe() and NostrRelayClient.send() now detect a SessionLimitExceededException cause in their respective catch blocks and call clientSession.close(CloseStatus.SESSION_NOT_RELIABLE) explicitly before rewrapping the exception. Non-overflow RuntimeExceptions continue to flow through the existing wrap-as-IOException path unchanged — only overflow terminates the session, matching Spring's OverflowStrategy.TERMINATE semantics that the framework only half-implements. The §6.7d concurrency test now strictly asserts clientSession.isOpen()==false after overflow, restoring the spec's original contract. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../springwebsocket/NostrRelayClient.java | 28 ++++++++++++++++++- .../NostrRelayClientConcurrencyTest.java | 27 ++++++++++-------- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java index f2e84364..0fda1289 100644 --- a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java +++ b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java @@ -15,6 +15,7 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; +import org.springframework.web.socket.handler.SessionLimitExceededException; import org.springframework.web.socket.handler.TextWebSocketHandler; import jakarta.websocket.ContainerProvider; @@ -476,7 +477,21 @@ public List send(String json) throws IOException { request = new PendingRequest(maxEventsPerRequest); pendingRequest = request; log.info("Sending request to relay {}: {}", relayUri, json); - clientSession.sendMessage(new TextMessage(json)); + try { + clientSession.sendMessage(new TextMessage(json)); + } catch (SessionLimitExceededException e) { + // OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws; + // it does NOT close the delegate session. Close it explicitly here so + // upstream callers' isOpen()==false reconnect contract holds. + pendingRequest = null; + try { + clientSession.close(CloseStatus.SESSION_NOT_RELIABLE); + } catch (IOException closeEx) { + // Logged but not propagated — the original cause is the signal. + log.warn("Failed to close session after overflow: {}", closeEx.getMessage()); + } + throw new IOException("Failed to send relay payload", e); + } } finally { sendLock.unlock(); } @@ -586,6 +601,17 @@ public AutoCloseable subscribe( throw e; } catch (RuntimeException e) { listeners.remove(listenerId); + // OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws; + // it does NOT close the delegate session. Close it explicitly here so + // upstream callers' isOpen()==false reconnect contract holds. + if (e instanceof SessionLimitExceededException) { + try { + clientSession.close(CloseStatus.SESSION_NOT_RELIABLE); + } catch (IOException closeEx) { + // Logged but not propagated — the original cause is the signal. + log.warn("Failed to close session after overflow: {}", closeEx.getMessage()); + } + } throw new IOException("Failed to send subscription payload", e); } diff --git a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java index 5b6f2a7d..566045ae 100644 --- a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java +++ b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java @@ -234,17 +234,9 @@ void decoratedSession_overflow_closesUnderlyingSession() throws Exception { "Expected at least one overflow exception under TERMINATE (observed " + failures.size() + ")"); - // Asserting (i) above; (ii) — spec §6.7d ii says the underlying session - // must be closed — is intentionally relaxed here. Spring's - // ConcurrentWebSocketSessionDecorator under TERMINATE does NOT close the - // delegate from limitExceeded() (it sets a private `limitExceeded` flag - // and throws SessionLimitExceededException; the delegate is closed only - // when somebody subsequently invokes the decorator's close()). The spec's - // assumption that overflow alone closes the session is incorrect for - // bare Spring; the closure happens upstream when NostrJavaRelayClient (or - // the application's MessageBrokerWebSocketHandler) handles the exception. - // We therefore assert the propagated overflow exception only, and leave - // the close-on-overflow chain to the upstream §6.7e wallet-lib test. + // (i) Overflow exception must propagate to the caller (either as the raw + // SessionLimitExceededException or NostrRelayClient.subscribe()'s + // IOException rewrap). boolean overflowExceptionPropagated = failures.stream().anyMatch(t -> { String msg = t.getMessage(); if (msg != null && msg.contains("SessionLimitExceeded")) return true; @@ -262,6 +254,19 @@ void decoratedSession_overflow_closesUnderlyingSession() throws Exception { assertTrue(overflowExceptionPropagated, "Expected at least one SessionLimitExceededException (or its rewrap) to propagate " + "to the caller; observed failures: " + failures); + + // (ii) After overflow, clientSession.isOpen() must return false. Spring's + // ConcurrentWebSocketSessionDecorator under TERMINATE only sets a + // private flag and throws SessionLimitExceededException — it does NOT + // auto-close the delegate. NostrRelayClient.subscribe() compensates + // by calling clientSession.close(CloseStatus.SESSION_NOT_RELIABLE) + // explicitly when the underlying cause is SessionLimitExceededException, + // so the upstream NostrJavaRelayClient reconnect contract (isOpen()==false + // → reconnect on next call) holds. This restores the §6.7d ii contract. + assertTrue(!isOpen.get(), + "After overflow, clientSession.isOpen() must return false so the upstream " + + "reconnect contract holds (subscribe()'s catch block must explicitly " + + "close the session on SessionLimitExceededException)."); } // ---- Constructor validation ---- From 51f97ed1c76d0a12d4df853d3a28531922fe0443 Mon Sep 17 00:00:00 2001 From: tcheeric Date: Fri, 8 May 2026 05:59:25 +0100 Subject: [PATCH 5/6] chore(release): bump version to 2.0.3 Co-Authored-By: Claude Opus 4.6 (1M context) --- nostr-java-client/pom.xml | 2 +- nostr-java-core/pom.xml | 2 +- nostr-java-event/pom.xml | 2 +- nostr-java-identity/pom.xml | 2 +- pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nostr-java-client/pom.xml b/nostr-java-client/pom.xml index b339d77d..fc9eb0b2 100644 --- a/nostr-java-client/pom.xml +++ b/nostr-java-client/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.2 + 2.0.3 ../pom.xml diff --git a/nostr-java-core/pom.xml b/nostr-java-core/pom.xml index 0ab6627c..aa188cba 100644 --- a/nostr-java-core/pom.xml +++ b/nostr-java-core/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.2 + 2.0.3 ../pom.xml diff --git a/nostr-java-event/pom.xml b/nostr-java-event/pom.xml index 921fcca6..8e1e5371 100644 --- a/nostr-java-event/pom.xml +++ b/nostr-java-event/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.2 + 2.0.3 ../pom.xml diff --git a/nostr-java-identity/pom.xml b/nostr-java-identity/pom.xml index b31aec36..103a60e6 100644 --- a/nostr-java-identity/pom.xml +++ b/nostr-java-identity/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.2 + 2.0.3 ../pom.xml diff --git a/pom.xml b/pom.xml index d91854ff..eadff199 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ xyz.tcheeric nostr-java - 2.0.2 + 2.0.3 pom nostr-java From 5cd1c0774f7c40a09c40928ac890e816d1d8dcb7 Mon Sep 17 00:00:00 2001 From: tcheeric Date: Fri, 8 May 2026 05:59:28 +0100 Subject: [PATCH 6/6] docs(changelog): add 2.0.3 entry for close-on-overflow follow-up Co-Authored-By: Claude Opus 4.6 (1M context) --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f575a7b..021b930d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,11 @@ The format is inspired by Keep a Changelog, and this project adheres to semantic - `BaseKey` now directly implements `Serializable` (previously implemented the now-deleted `IKey` interface). - `Nip05Validator` now creates `HttpClient` instances directly via a `Function` factory (previously used deleted `HttpClientProvider`/`DefaultHttpClientProvider` interface). +## [2.0.3] - 2026-05-08 + +### Fixed +- Explicit close on `OverflowStrategy.TERMINATE`; restores upstream `isOpen()==false` reconnect contract. Spring's `ConcurrentWebSocketSessionDecorator` under `OverflowStrategy.TERMINATE` only sets a private `limitExceeded` flag and throws `SessionLimitExceededException` from `limitExceeded()` — it does **not** close the delegate session. As a result, after 2.0.2 the upstream caller's `clientSession.isOpen()==false` → reconnect contract did not hold (the session stayed open after overflow). `NostrRelayClient.subscribe()` and `NostrRelayClient.send()` now detect a `SessionLimitExceededException` cause in their respective catch blocks and call `clientSession.close(CloseStatus.SESSION_NOT_RELIABLE)` explicitly before rewrapping the exception. Non-overflow `RuntimeException`s continue to flow through the existing wrap-as-`IOException` path unchanged. The §6.7d concurrency test now strictly asserts `clientSession.isOpen()==false` after overflow, matching the spec's original contract. + ## [2.0.2] - 2026-05-08 ### Fixed