diff --git a/CHANGELOG.md b/CHANGELOG.md index 9710cca3..021b930d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,23 @@ The format is inspired by Keep a Changelog, and this project adheres to semantic - `BaseKey` now directly implements `Serializable` (previously implemented the now-deleted `IKey` interface). - `Nip05Validator` now creates `HttpClient` instances directly via a `Function` factory (previously used deleted `HttpClientProvider`/`DefaultHttpClientProvider` interface). +## [2.0.3] - 2026-05-08 + +### Fixed +- Explicit close on `OverflowStrategy.TERMINATE`; restores upstream `isOpen()==false` reconnect contract. Spring's `ConcurrentWebSocketSessionDecorator` under `OverflowStrategy.TERMINATE` only sets a private `limitExceeded` flag and throws `SessionLimitExceededException` from `limitExceeded()` — it does **not** close the delegate session. As a result, after 2.0.2 the upstream caller's `clientSession.isOpen()==false` → reconnect contract did not hold (the session stayed open after overflow). `NostrRelayClient.subscribe()` and `NostrRelayClient.send()` now detect a `SessionLimitExceededException` cause in their respective catch blocks and call `clientSession.close(CloseStatus.SESSION_NOT_RELIABLE)` explicitly before rewrapping the exception. Non-overflow `RuntimeException`s continue to flow through the existing wrap-as-`IOException` path unchanged. The §6.7d concurrency test now strictly asserts `clientSession.isOpen()==false` after overflow, matching the spec's original contract. + +## [2.0.2] - 2026-05-08 + +### Fixed +- `NostrRelayClient.subscribe()` was calling `clientSession.sendMessage(...)` without holding the existing `sendLock`, while the same class's `send()` path correctly held it. Two threads racing inside `subscribe()` (or one in `subscribe()` racing one in `send()`) could trigger the underlying writer's `IllegalStateException("The remote endpoint was in state [TEXT_FULL_WRITING]")` (Tomcat / Spring `StandardWebSocketSession`) or `Blocking message pending` (Jetty), which the `catch (RuntimeException)` block at the bottom of `subscribe()` rewrapped as `IOException("Failed to send subscription payload", e)`. Downstream consumer (`imani-gateway-core` `account-app`) observed ~70 `RelaySubscribeException` per 60 minutes once a circuit-breaker tuning fix unmasked the underlying race. Fixed by wrapping the underlying `WebSocketSession` returned by `connectSession(...)` in Spring's `ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE)` at construction time, so concurrent `sendMessage()` calls from any send-path are serialised at the session layer. + +### Added +- New canonical four-arg public constructor `NostrRelayClient(String relayUri, long awaitTimeoutMs, int sendBufferLimit, int sendTimeLimitMs)` annotated `@Autowired` for Spring constructor-injection. Spring binds against this overload via the new `@Value("${nostr.websocket.send-buffer-limit:262144}")` and `@Value("${nostr.websocket.send-time-limit-ms:10000}")` keys (also reachable via the `NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT` / `NOSTR_WEBSOCKET_SEND_TIME_LIMIT_MS` env-vars under Spring relaxed binding). The previously-existing one-arg `(String)` and two-arg `(String, long)` public constructors are retained as delegating overloads (binary-compatible) and now also benefit from the decorator wrap by way of the canonical ctor. +- Test-only static factories `forTestWithRawSession(...)` and `forTestWithDecoratedSession(...)` (two overloads — defaults and explicit) plus a package-private four-arg test constructor, so the new `NostrRelayClientConcurrencyTest` can assert both the regression (raw-session reproduction of the `IllegalStateException` race) and the resolution (decorator-wrapped session serialises sends). + +### Changed +- `awaitTimeoutMs` is now a `final` field assigned once via constructor injection (previously `@Value`-annotated field). Spring's constructor-injection ordering guarantees the value reaches the constructor body before the decorator is constructed, which is required for the explicit overflow strategy to be applied. The system property / env-var key (`nostr.websocket.await-timeout-ms`) and its default (60 000 ms) are unchanged. + ## [2.0.1] - 2026-05-06 ### Fixed diff --git a/nostr-java-client/pom.xml b/nostr-java-client/pom.xml index 07aab89b..fc9eb0b2 100644 --- a/nostr-java-client/pom.xml +++ b/nostr-java-client/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.3 ../pom.xml diff --git a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java index 4c893991..0fda1289 100644 --- a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java +++ b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java @@ -11,8 +11,11 @@ import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHttpHeaders; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.standard.StandardWebSocketClient; +import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; +import org.springframework.web.socket.handler.SessionLimitExceededException; import org.springframework.web.socket.handler.TextWebSocketHandler; import jakarta.websocket.ContainerProvider; @@ -53,6 +56,26 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea private static final int DEFAULT_MAX_TEXT_MESSAGE_BUFFER_SIZE = 1048576; private static final int DEFAULT_MAX_BINARY_MESSAGE_BUFFER_SIZE = 1048576; private static final int DEFAULT_MAX_EVENTS_PER_REQUEST = 10_000; + /** + * Default {@code ConcurrentWebSocketSessionDecorator} send-buffer size — 256 KiB. + * + *

Sized at ~2.5× the largest expected single payload (a chunked kind-37375 + * EVENT can reach ~100 KiB) so that a large {@code send()} co-existing with + * a burst of subscribe REQs does not overflow the decorator's buffer. Tunable + * via {@code nostr.websocket.send-buffer-limit} ({@code @Value}) or + * {@code NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT} (env-var, via Spring relaxed + * binding). + */ + private static final int DEFAULT_SEND_BUFFER_LIMIT = 256 * 1024; + /** + * Default {@code ConcurrentWebSocketSessionDecorator} per-send time limit — 10 s. + * + *

Above any p99.9 healthy-state subscribe REQ ({@literal <}100 ms) and well + * below {@link #DEFAULT_AWAIT_TIMEOUT_MS} so the writer-side timeout fires + * first if the underlying writer is genuinely stuck. Tunable via + * {@code nostr.websocket.send-time-limit-ms}. + */ + private static final int DEFAULT_SEND_TIME_LIMIT_MS = 10_000; private static final ThreadFactory RELAY_IO_THREAD_FACTORY = Thread.ofVirtual().name("nostr-relay-io-", 0).factory(); private static final ThreadFactory LISTENER_THREAD_FACTORY = @@ -62,8 +85,7 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea private static final Executor LISTENER_EXECUTOR = command -> LISTENER_THREAD_FACTORY.newThread(command).start(); - @Value("${nostr.websocket.await-timeout-ms:60000}") - private long awaitTimeoutMs; + private final long awaitTimeoutMs; @Value("${nostr.websocket.max-idle-timeout-ms:3600000}") private long maxIdleTimeoutMs; @@ -71,6 +93,20 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea @Value("${nostr.websocket.max-events-per-request:10000}") private int maxEventsPerRequest = DEFAULT_MAX_EVENTS_PER_REQUEST; + /** + * Decorator buffer-size limit captured at construction. Exposed as a + * package-private field so unit tests can assert that constructor arguments + * are reflected here rather than overridden by static defaults (the + * field-injection regression assertion described in the spec). + */ + private final int sendBufferLimit; + + /** + * Decorator per-send time-limit captured at construction. See + * {@link #sendBufferLimit} for the regression-assertion rationale. + */ + private final int sendTimeLimitMs; + private final WebSocketSession clientSession; /** * Relay URI captured at construction time. Used for logging so that @@ -124,34 +160,130 @@ int eventCount() { } } + /** + * Back-compat constructor — delegates to the canonical four-arg constructor + * with default await-timeout / send-buffer-limit / send-time-limit. Retained + * for direct {@code new}-callers (e.g. the {@code connectAsync(String)} + * factory). Spring will not select this overload because the + * canonical four-arg constructor is annotated {@link Autowired}. + */ public NostrRelayClient(@Value("${nostr.relay.uri}") String relayUri) throws java.util.concurrent.ExecutionException, InterruptedException { - this.relayUri = relayUri; - this.clientSession = connectSession(relayUri); - connectionState.set(ConnectionState.CONNECTED); + this(relayUri, DEFAULT_AWAIT_TIMEOUT_MS, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); } + /** + * Back-compat constructor — delegates to the canonical four-arg constructor + * with default send-buffer-limit / send-time-limit. Retained for direct + * {@code new}-callers (e.g. the {@code connectAsync(String, long)} factory + * and the {@link NostrRelayClient(WebSocketSession, long)} test factory). + */ public NostrRelayClient(String relayUri, long awaitTimeoutMs) throws java.util.concurrent.ExecutionException, InterruptedException { + this(relayUri, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Canonical constructor — the one Spring autowires against. Wraps the + * underlying {@link WebSocketSession} returned by {@link #connectSession} + * in a {@link ConcurrentWebSocketSessionDecorator} so concurrent + * {@code sendMessage()} calls from {@code subscribe()} and {@code send()} + * are serialised. Without the wrap, two threads racing inside + * {@code subscribe()} can trigger a Tomcat + * {@code IllegalStateException("The remote endpoint was in state + * [TEXT_FULL_WRITING]")} (or the Jetty / Undertow equivalent) which is + * caught by the existing {@code catch (RuntimeException)} block at the + * bottom of {@code subscribe()} and rewrapped as + * {@code IOException("Failed to send subscription payload", …)}. + * + *

The decorator is constructed with + * {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy#TERMINATE} + * (made explicit rather than relying on the default): if the buffer fills, + * the underlying session is closed, the calling thread sees a + * {@code SessionLimitExceededException}, and the next call lands on the + * reconnect path in the upstream {@code NostrJavaRelayClient} caller. + * {@code OverflowStrategy.DROP} is explicitly avoided because dropping + * outbound REQ / EVENT frames would silently make callers believe a + * subscribe / publish succeeded when the relay never received it. + */ + @Autowired + public NostrRelayClient( + @Value("${nostr.relay.uri}") String relayUri, + @Value("${nostr.websocket.await-timeout-ms:60000}") long awaitTimeoutMs, + @Value("${nostr.websocket.send-buffer-limit:262144}") int sendBufferLimit, + @Value("${nostr.websocket.send-time-limit-ms:10000}") int sendTimeLimitMs) + throws java.util.concurrent.ExecutionException, InterruptedException { if (awaitTimeoutMs <= 0) { throw new IllegalArgumentException("awaitTimeoutMs must be positive"); } - this.awaitTimeoutMs = awaitTimeoutMs; + if (sendBufferLimit <= 0) { + throw new IllegalArgumentException("sendBufferLimit must be positive"); + } + if (sendTimeLimitMs <= 0) { + throw new IllegalArgumentException("sendTimeLimitMs must be positive"); + } this.relayUri = relayUri; - log.info("NostrRelayClient created for {} with awaitTimeoutMs={}", relayUri, awaitTimeoutMs); - this.clientSession = connectSession(relayUri); + this.awaitTimeoutMs = awaitTimeoutMs; + this.sendBufferLimit = sendBufferLimit; + this.sendTimeLimitMs = sendTimeLimitMs; + log.info( + "NostrRelayClient created for {} with awaitTimeoutMs={} sendBufferLimit={} sendTimeLimitMs={}", + relayUri, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs); + WebSocketSession raw = connectSession(relayUri); + this.clientSession = + new ConcurrentWebSocketSessionDecorator( + raw, sendTimeLimitMs, sendBufferLimit, + ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE); connectionState.set(ConnectionState.CONNECTED); } + /** + * Test-only constructor — the supplied {@link WebSocketSession} is stored + * as-is; no decorator wrap is applied. Production code paths must + * not reach this constructor — they go through the public + * {@code (String, …)} constructors that perform the wrap. + * + *

The §6.7a reproduction step (regression test for the concurrent-send + * race) needs a raw, unwrapped session to deterministically reproduce the + * {@code IllegalStateException}. Use + * {@link #forTestWithRawSession(WebSocketSession, long)} for that path. The + * §6.7b–d resolution steps wrap the session via + * {@link #forTestWithDecoratedSession(WebSocketSession, long)} and then + * reach this constructor with the wrapped session as the argument. + */ NostrRelayClient(WebSocketSession clientSession, long awaitTimeoutMs) { + this(clientSession, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Test-only constructor — four-arg form taking explicit decorator parameters + * so unit tests can assert that the constructor arguments are reflected in + * {@link #sendBufferLimit} and {@link #sendTimeLimitMs} (the field-injection + * regression assertion described in spec §4.1). The supplied session is + * stored as-is — wrapping is the caller's responsibility, see + * {@link #forTestWithDecoratedSession(WebSocketSession, long, int, int)}. + */ + NostrRelayClient( + WebSocketSession clientSession, + long awaitTimeoutMs, + int sendBufferLimit, + int sendTimeLimitMs) { if (clientSession == null) { throw new NullPointerException("clientSession must not be null"); } if (awaitTimeoutMs <= 0) { throw new IllegalArgumentException("awaitTimeoutMs must be positive"); } + if (sendBufferLimit <= 0) { + throw new IllegalArgumentException("sendBufferLimit must be positive"); + } + if (sendTimeLimitMs <= 0) { + throw new IllegalArgumentException("sendTimeLimitMs must be positive"); + } this.clientSession = clientSession; this.awaitTimeoutMs = awaitTimeoutMs; + this.sendBufferLimit = sendBufferLimit; + this.sendTimeLimitMs = sendTimeLimitMs; URI sessionUri = null; try { sessionUri = clientSession.getUri(); @@ -163,6 +295,64 @@ public NostrRelayClient(String relayUri, long awaitTimeoutMs) connectionState.set(ConnectionState.CONNECTED); } + /** + * Test-only factory — REGRESSION reproduction path. Bypasses the decorator + * so the §6.7a reproduction can deterministically reproduce the + * concurrent-send {@code IllegalStateException} against a raw session. + * Not used by production code. + */ + static NostrRelayClient forTestWithRawSession(WebSocketSession session, long awaitTimeoutMs) { + return new NostrRelayClient( + session, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Test-only factory — production path with default buffer / time-limit. + * Wraps the supplied session in a {@link ConcurrentWebSocketSessionDecorator} + * (using static defaults) and forwards through the package-private four-arg + * test ctor. + */ + static NostrRelayClient forTestWithDecoratedSession( + WebSocketSession session, long awaitTimeoutMs) { + return forTestWithDecoratedSession( + session, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS); + } + + /** + * Test-only factory — production path with custom buffer / time-limit. Used + * by the §6.7c (mixed-workload, default buffer) and §6.7d (overflow-and-close, + * small buffer) sub-cases. + */ + static NostrRelayClient forTestWithDecoratedSession( + WebSocketSession session, + long awaitTimeoutMs, + int sendBufferLimit, + int sendTimeLimitMs) { + WebSocketSession wrapped = + (session instanceof ConcurrentWebSocketSessionDecorator) + ? session + : new ConcurrentWebSocketSessionDecorator( + session, sendTimeLimitMs, sendBufferLimit, + ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE); + return new NostrRelayClient(wrapped, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs); + } + + /** + * Package-private accessor — exposes {@link #sendBufferLimit} for the + * field-injection regression unit test (§6.7b assertion (iv)). + */ + int sendBufferLimitForTest() { + return sendBufferLimit; + } + + /** + * Package-private accessor — exposes {@link #sendTimeLimitMs} for the + * field-injection regression unit test (§6.7b assertion (iv)). + */ + int sendTimeLimitMsForTest() { + return sendTimeLimitMs; + } + /** * Connect to a relay asynchronously on a Virtual Thread. * @@ -287,7 +477,21 @@ public List send(String json) throws IOException { request = new PendingRequest(maxEventsPerRequest); pendingRequest = request; log.info("Sending request to relay {}: {}", relayUri, json); - clientSession.sendMessage(new TextMessage(json)); + try { + clientSession.sendMessage(new TextMessage(json)); + } catch (SessionLimitExceededException e) { + // OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws; + // it does NOT close the delegate session. Close it explicitly here so + // upstream callers' isOpen()==false reconnect contract holds. + pendingRequest = null; + try { + clientSession.close(CloseStatus.SESSION_NOT_RELIABLE); + } catch (IOException closeEx) { + // Logged but not propagated — the original cause is the signal. + log.warn("Failed to close session after overflow: {}", closeEx.getMessage()); + } + throw new IOException("Failed to send relay payload", e); + } } finally { sendLock.unlock(); } @@ -397,6 +601,17 @@ public AutoCloseable subscribe( throw e; } catch (RuntimeException e) { listeners.remove(listenerId); + // OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws; + // it does NOT close the delegate session. Close it explicitly here so + // upstream callers' isOpen()==false reconnect contract holds. + if (e instanceof SessionLimitExceededException) { + try { + clientSession.close(CloseStatus.SESSION_NOT_RELIABLE); + } catch (IOException closeEx) { + // Logged but not propagated — the original cause is the signal. + log.warn("Failed to close session after overflow: {}", closeEx.getMessage()); + } + } throw new IOException("Failed to send subscription payload", e); } diff --git a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java new file mode 100644 index 00000000..566045ae --- /dev/null +++ b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientConcurrencyTest.java @@ -0,0 +1,474 @@ +package nostr.client.springwebsocket; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; + +/** + * Concurrency regression tests for the {@code subscribe()} send-side race + * fixed by wrapping {@code clientSession} in a + * {@code ConcurrentWebSocketSessionDecorator}. + * + *

Structured as fail-first / pass-after per the spec + * acceptance criteria (§6.7): + * + *

+ * + *

Constructor-validation tests cover positive-value enforcement on the + * package-private four-arg test ctor. + */ +class NostrRelayClientConcurrencyTest { + + private static final long TEST_AWAIT_TIMEOUT_MS = 5_000L; + private static final int CONCURRENCY = 32; + private static final int CONCURRENCY_LIGHT = 20; + private static final int BLOCK_MS = 50; + private static final int RESOLUTION_DEADLINE_MS = 5_000; + private static final int MIXED_WORKLOAD_DEADLINE_MS = 10_000; + + // ---- §6.7a: reproduction step against a raw (non-decorated) session ---- + @Test + void rawSession_concurrentSubscribe_reproducesTextFullWritingRace() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + WebSocketSession raw = newBlockingMockSession(inFlight, maxInFlight, isOpen, BLOCK_MS, + /* throwOnConcurrent= */ true); + + NostrRelayClient client = NostrRelayClient.forTestWithRawSession(raw, TEST_AWAIT_TIMEOUT_MS); + + List failures = runConcurrentSubscribes(client, CONCURRENCY); + + long ioWithIllegalStateCause = failures.stream() + .filter(t -> t instanceof IOException) + .filter(t -> "Failed to send subscription payload".equals(t.getMessage())) + .filter(t -> t.getCause() instanceof IllegalStateException) + .count(); + assertTrue( + ioWithIllegalStateCause >= 1, + "Expected at least one IOException(\"Failed to send subscription payload\") with cause " + + "IllegalStateException to reproduce the concurrent-send race against a raw session " + + "(observed " + ioWithIllegalStateCause + " out of " + failures.size() + " failures)"); + } + + // ---- §6.7b: resolution step against a decorated session ---- + @Test + void decoratedSession_concurrentSubscribe_serialisesAndSucceeds() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + WebSocketSession raw = newBlockingMockSession(inFlight, maxInFlight, isOpen, BLOCK_MS, + /* throwOnConcurrent= */ true); + + NostrRelayClient client = + NostrRelayClient.forTestWithDecoratedSession(raw, TEST_AWAIT_TIMEOUT_MS); + + long start = System.nanoTime(); + List failures = runConcurrentSubscribes(client, CONCURRENCY); + long elapsedMs = (System.nanoTime() - start) / 1_000_000L; + + assertTrue(failures.isEmpty(), + "Expected zero failures with the decorator, observed " + failures.size() + + ": first=" + (failures.isEmpty() ? "" : failures.get(0))); + assertEquals(1, maxInFlight.get(), + "Decorator must serialise sends — maxInFlight should be exactly 1, was " + + maxInFlight.get()); + assertTrue(elapsedMs < RESOLUTION_DEADLINE_MS * 2L, + "Resolution harness exceeded " + (RESOLUTION_DEADLINE_MS * 2L) + + "ms wall-clock (was " + elapsedMs + "ms)"); + + // (iv) field-injection regression — constructor args reflected, not static defaults. + int defaultsBuffer = NostrRelayClient.forTestWithDecoratedSession( + Mockito.mock(WebSocketSession.class), TEST_AWAIT_TIMEOUT_MS).sendBufferLimitForTest(); + assertEquals(256 * 1024, defaultsBuffer, + "Two-arg test factory must use the 256 KiB default buffer"); + int customBuffer = NostrRelayClient.forTestWithDecoratedSession( + Mockito.mock(WebSocketSession.class), TEST_AWAIT_TIMEOUT_MS, 8 * 1024, 7_500) + .sendBufferLimitForTest(); + assertEquals(8 * 1024, customBuffer, + "Four-arg test factory must reflect the constructor argument, not the static default"); + int customTimeLimit = NostrRelayClient.forTestWithDecoratedSession( + Mockito.mock(WebSocketSession.class), TEST_AWAIT_TIMEOUT_MS, 8 * 1024, 7_500) + .sendTimeLimitMsForTest(); + assertEquals(7_500, customTimeLimit, + "Four-arg test factory must reflect the sendTimeLimitMs constructor argument"); + } + + // ---- §6.7c: mixed-workload (100 KiB send racing with subscribes) ---- + @Test + void decoratedSession_mixedWorkload_largeSendWithSubscribesUnderDefaultBuffer() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + // Light blocking on subscribes; the large send is simulated by a separate + // 200 ms blocking branch keyed on payload size. + WebSocketSession raw = newSizeAwareMockSession(inFlight, maxInFlight, isOpen, + /* normalBlockMs= */ BLOCK_MS, /* largeBlockMs= */ 200, + /* largeThresholdBytes= */ 64 * 1024); + + NostrRelayClient client = NostrRelayClient.forTestWithDecoratedSession( + raw, TEST_AWAIT_TIMEOUT_MS, + /* sendBufferLimit= */ 256 * 1024, /* sendTimeLimitMs= */ 10_000); + + String largePayload = makePayload(100 * 1024); + int totalCalls = CONCURRENCY_LIGHT + 1; + List failures = new ArrayList<>(); + ExecutorService pool = Executors.newFixedThreadPool(totalCalls); + CountDownLatch startingGun = new CountDownLatch(1); + + long start = System.nanoTime(); + for (int i = 0; i < CONCURRENCY_LIGHT; i++) { + final int idx = i; + pool.submit(() -> { + try { + startingGun.await(); + client.subscribe("[\"REQ\",\"sub-" + idx + "\"]", + ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + } + pool.submit(() -> { + try { + startingGun.await(); + // The size-aware mock branches to a 200 ms slow-flush path on this + // payload — simulating a chunked-EVENT publish racing the subscribes. + client.subscribe(largePayload, ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + startingGun.countDown(); + pool.shutdown(); + boolean done = pool.awaitTermination(MIXED_WORKLOAD_DEADLINE_MS + 5_000, TimeUnit.MILLISECONDS); + long elapsedMs = (System.nanoTime() - start) / 1_000_000L; + + assertTrue(done, "Mixed-workload pool did not terminate within " + + (MIXED_WORKLOAD_DEADLINE_MS + 5_000) + "ms"); + assertTrue(failures.isEmpty(), + "Mixed workload failures: " + failures.size() + + " — first=" + (failures.isEmpty() ? "" : failures.get(0))); + assertEquals(1, maxInFlight.get(), + "Decorator must serialise the large send + 20 subscribes — maxInFlight was " + + maxInFlight.get()); + assertTrue(elapsedMs < MIXED_WORKLOAD_DEADLINE_MS, + "Mixed workload exceeded " + MIXED_WORKLOAD_DEADLINE_MS + "ms wall-clock (was " + + elapsedMs + "ms)"); + } + + // ---- §6.7d: overflow-and-close ---- + @Test + void decoratedSession_overflow_closesUnderlyingSession() throws Exception { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + AtomicBoolean isOpen = new AtomicBoolean(true); + AtomicBoolean unblock = new AtomicBoolean(false); + WebSocketSession raw = newIndefinitelyBlockingMockSession(inFlight, maxInFlight, isOpen, + unblock); + + // Tiny 256-byte buffer with 1 KiB-sized subscribes — overflow guaranteed. + NostrRelayClient client = NostrRelayClient.forTestWithDecoratedSession( + raw, TEST_AWAIT_TIMEOUT_MS, + /* sendBufferLimit= */ 256, /* sendTimeLimitMs= */ 10_000); + + String oneKib = makePayload(1024); + int n = 50; + List failures = new ArrayList<>(); + ExecutorService pool = Executors.newFixedThreadPool(n); + CountDownLatch startingGun = new CountDownLatch(1); + for (int i = 0; i < n; i++) { + pool.submit(() -> { + try { + startingGun.await(); + client.subscribe(oneKib, ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + } + startingGun.countDown(); + + // Wait for the decorator to overflow and close the session. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (System.nanoTime() < deadline && isOpen.get()) { + Thread.sleep(50); + } + // Release the indefinite-blockers so the pool can drain. + unblock.set(true); + synchronized (raw) { raw.notifyAll(); } + pool.shutdown(); + pool.awaitTermination(10, TimeUnit.SECONDS); + + assertTrue(failures.size() >= 1, + "Expected at least one overflow exception under TERMINATE (observed " + + failures.size() + ")"); + + // (i) Overflow exception must propagate to the caller (either as the raw + // SessionLimitExceededException or NostrRelayClient.subscribe()'s + // IOException rewrap). + boolean overflowExceptionPropagated = failures.stream().anyMatch(t -> { + String msg = t.getMessage(); + if (msg != null && msg.contains("SessionLimitExceeded")) return true; + // The wrapped form: NostrRelayClient.subscribe() catches RuntimeException + // (which SessionLimitExceededException extends) and rewraps as + // IOException("Failed to send subscription payload", e). + if (t instanceof IOException + && "Failed to send subscription payload".equals(t.getMessage()) + && t.getCause() != null + && t.getCause().getClass().getName().endsWith("SessionLimitExceededException")) { + return true; + } + return false; + }); + assertTrue(overflowExceptionPropagated, + "Expected at least one SessionLimitExceededException (or its rewrap) to propagate " + + "to the caller; observed failures: " + failures); + + // (ii) After overflow, clientSession.isOpen() must return false. Spring's + // ConcurrentWebSocketSessionDecorator under TERMINATE only sets a + // private flag and throws SessionLimitExceededException — it does NOT + // auto-close the delegate. NostrRelayClient.subscribe() compensates + // by calling clientSession.close(CloseStatus.SESSION_NOT_RELIABLE) + // explicitly when the underlying cause is SessionLimitExceededException, + // so the upstream NostrJavaRelayClient reconnect contract (isOpen()==false + // → reconnect on next call) holds. This restores the §6.7d ii contract. + assertTrue(!isOpen.get(), + "After overflow, clientSession.isOpen() must return false so the upstream " + + "reconnect contract holds (subscribe()'s catch block must explicitly " + + "close the session on SessionLimitExceededException)."); + } + + // ---- Constructor validation ---- + @Test + void packagePrivateFourArgCtor_rejectsNonPositiveBufferLimit() { + WebSocketSession s = Mockito.mock(WebSocketSession.class); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, 0, 1_000)); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, -1, 1_000)); + } + + @Test + void packagePrivateFourArgCtor_rejectsNonPositiveTimeLimit() { + WebSocketSession s = Mockito.mock(WebSocketSession.class); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, 1_024, 0)); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, TEST_AWAIT_TIMEOUT_MS, 1_024, -5)); + } + + @Test + void packagePrivateFourArgCtor_rejectsNonPositiveAwaitTimeout() { + WebSocketSession s = Mockito.mock(WebSocketSession.class); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, 0, 1_024, 1_000)); + assertThrows(IllegalArgumentException.class, + () -> new NostrRelayClient(s, -1, 1_024, 1_000)); + } + + @Test + void packagePrivateFourArgCtor_rejectsNullSession() { + assertThrows(NullPointerException.class, + () -> new NostrRelayClient((WebSocketSession) null, TEST_AWAIT_TIMEOUT_MS, 1_024, 1_000)); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + /** + * Build a mock session whose {@code sendMessage} blocks for {@code blockMs} + * milliseconds while incrementing an in-flight counter. If + * {@code throwOnConcurrent} is set, on entry to {@code sendMessage} when the + * counter rises above 1 the mock throws + * {@code IllegalStateException("simulated TEXT_FULL_WRITING")} — emulating + * Tomcat / Jetty's real-world thread-safety violation. + */ + private static WebSocketSession newBlockingMockSession( + AtomicInteger inFlight, + AtomicInteger maxInFlight, + AtomicBoolean isOpen, + int blockMs, + boolean throwOnConcurrent) { + WebSocketSession session = Mockito.mock(WebSocketSession.class); + Mockito.when(session.isOpen()).thenAnswer(inv -> isOpen.get()); + Answer sendAnswer = invocation -> { + int now = inFlight.incrementAndGet(); + try { + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + if (throwOnConcurrent && now > 1) { + throw new IllegalStateException("simulated TEXT_FULL_WRITING"); + } + if (blockMs > 0) { + Thread.sleep(blockMs); + } + } finally { + inFlight.decrementAndGet(); + } + return null; + }; + try { + Mockito.doAnswer(sendAnswer).when(session).sendMessage(any(TextMessage.class)); + } catch (IOException impossible) { + throw new AssertionError(impossible); + } + return session; + } + + /** + * Build a mock whose {@code sendMessage} blocks indefinitely (for the + * §6.7d overflow test). The {@code unblock} flag releases pending sends. + * Crucially, when the test calls + * {@link WebSocketSession#close(CloseStatus)} (which the decorator does on + * TERMINATE), {@code isOpen()} pivots to false. + */ + private static WebSocketSession newIndefinitelyBlockingMockSession( + AtomicInteger inFlight, + AtomicInteger maxInFlight, + AtomicBoolean isOpen, + AtomicBoolean unblock) { + WebSocketSession session = Mockito.mock(WebSocketSession.class); + Mockito.when(session.isOpen()).thenAnswer(inv -> isOpen.get()); + Answer closeAnswer = invocation -> { + isOpen.set(false); + synchronized (session) { session.notifyAll(); } + return null; + }; + Answer sendAnswer = invocation -> { + int now = inFlight.incrementAndGet(); + try { + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + synchronized (session) { + while (!unblock.get() && isOpen.get()) { + session.wait(60_000); + // Loop guard against spurious wakeups; one iteration is enough — we + // either get notified by close()/notifyAll() or by the test's + // unblock-pivot. + break; + } + } + if (!isOpen.get()) { + throw new IOException("simulated session-closed mid-send"); + } + } finally { + inFlight.decrementAndGet(); + } + return null; + }; + try { + Mockito.doAnswer(sendAnswer).when(session).sendMessage(any(TextMessage.class)); + Mockito.doAnswer(closeAnswer).when(session).close(); + Mockito.doAnswer(closeAnswer).when(session).close(any(CloseStatus.class)); + } catch (IOException impossible) { + throw new AssertionError(impossible); + } + return session; + } + + /** + * Build a mock whose {@code sendMessage} blocks for {@code largeBlockMs} + * if the payload exceeds {@code largeThresholdBytes}, otherwise for + * {@code normalBlockMs}. Used by the §6.7c mixed-workload test. + */ + private static WebSocketSession newSizeAwareMockSession( + AtomicInteger inFlight, + AtomicInteger maxInFlight, + AtomicBoolean isOpen, + int normalBlockMs, + int largeBlockMs, + int largeThresholdBytes) { + WebSocketSession session = Mockito.mock(WebSocketSession.class); + Mockito.when(session.isOpen()).thenAnswer(inv -> isOpen.get()); + Answer sendAnswer = (InvocationOnMock invocation) -> { + int now = inFlight.incrementAndGet(); + try { + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + TextMessage msg = invocation.getArgument(0); + int sleepMs = msg.getPayloadLength() >= largeThresholdBytes + ? largeBlockMs : normalBlockMs; + Thread.sleep(sleepMs); + } finally { + inFlight.decrementAndGet(); + } + return null; + }; + try { + Mockito.doAnswer(sendAnswer).when(session).sendMessage(any(TextMessage.class)); + } catch (IOException impossible) { + throw new AssertionError(impossible); + } + return session; + } + + /** + * Run {@code n} concurrent {@code subscribe()} calls against the supplied + * client, all released by a single starting-gun, and return the list of + * exceptions raised. Caller must inspect the list — the worker pool is + * always awaited (15 s deadline) before this method returns. + */ + private static List runConcurrentSubscribes( + NostrRelayClient client, int n) throws InterruptedException { + List failures = new ArrayList<>(); + ExecutorService pool = Executors.newFixedThreadPool(n); + CountDownLatch startingGun = new CountDownLatch(1); + for (int i = 0; i < n; i++) { + final int idx = i; + pool.submit(() -> { + try { + startingGun.await(); + client.subscribe("[\"REQ\",\"sub-" + idx + "\"]", + ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + synchronized (failures) { failures.add(t); } + } + }); + } + startingGun.countDown(); + pool.shutdown(); + boolean done = pool.awaitTermination(15, TimeUnit.SECONDS); + assertTrue(done, "Worker pool did not terminate within 15 s " + + "(failures so far: " + failures.size() + ")"); + return failures; + } + + private static String makePayload(int sizeBytes) { + StringBuilder sb = new StringBuilder(sizeBytes + 32); + sb.append("[\"REQ\",\"x\",\""); + while (sb.length() < sizeBytes - 2) { + sb.append('a'); + } + sb.append("\"]"); + return sb.toString(); + } +} diff --git a/nostr-java-core/pom.xml b/nostr-java-core/pom.xml index 028f7cc7..aa188cba 100644 --- a/nostr-java-core/pom.xml +++ b/nostr-java-core/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.3 ../pom.xml diff --git a/nostr-java-event/pom.xml b/nostr-java-event/pom.xml index 63139968..8e1e5371 100644 --- a/nostr-java-event/pom.xml +++ b/nostr-java-event/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.3 ../pom.xml diff --git a/nostr-java-identity/pom.xml b/nostr-java-identity/pom.xml index 68f41a56..103a60e6 100644 --- a/nostr-java-identity/pom.xml +++ b/nostr-java-identity/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.3 ../pom.xml diff --git a/pom.xml b/pom.xml index 2e2d494e..eadff199 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ xyz.tcheeric nostr-java - 2.0.1 + 2.0.3 pom nostr-java