Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ The format is inspired by Keep a Changelog, and this project adheres to semantic
- `BaseKey` now directly implements `Serializable` (previously implemented the now-deleted `IKey` interface).
- `Nip05Validator` now creates `HttpClient` instances directly via a `Function<Duration, HttpClient>` factory (previously used deleted `HttpClientProvider`/`DefaultHttpClientProvider` interface).

## [2.0.3] - 2026-05-08

### Fixed
- Explicit close on `OverflowStrategy.TERMINATE`; restores upstream `isOpen()==false` reconnect contract. Spring's `ConcurrentWebSocketSessionDecorator` under `OverflowStrategy.TERMINATE` only sets a private `limitExceeded` flag and throws `SessionLimitExceededException` from `limitExceeded()` — it does **not** close the delegate session. As a result, after 2.0.2 the upstream caller's `clientSession.isOpen()==false` → reconnect contract did not hold (the session stayed open after overflow). `NostrRelayClient.subscribe()` and `NostrRelayClient.send()` now detect a `SessionLimitExceededException` cause in their respective catch blocks and call `clientSession.close(CloseStatus.SESSION_NOT_RELIABLE)` explicitly before rewrapping the exception. Non-overflow `RuntimeException`s continue to flow through the existing wrap-as-`IOException` path unchanged. The §6.7d concurrency test now strictly asserts `clientSession.isOpen()==false` after overflow, matching the spec's original contract.

## [2.0.2] - 2026-05-08

### Fixed
- `NostrRelayClient.subscribe()` was calling `clientSession.sendMessage(...)` without holding the existing `sendLock`, while the same class's `send()` path correctly held it. Two threads racing inside `subscribe()` (or one in `subscribe()` racing one in `send()`) could trigger the underlying writer's `IllegalStateException("The remote endpoint was in state [TEXT_FULL_WRITING]")` (Tomcat / Spring `StandardWebSocketSession`) or `Blocking message pending` (Jetty), which the `catch (RuntimeException)` block at the bottom of `subscribe()` rewrapped as `IOException("Failed to send subscription payload", e)`. Downstream consumer (`imani-gateway-core` `account-app`) observed ~70 `RelaySubscribeException` per 60 minutes once a circuit-breaker tuning fix unmasked the underlying race. Fixed by wrapping the underlying `WebSocketSession` returned by `connectSession(...)` in Spring's `ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE)` at construction time, so concurrent `sendMessage()` calls from any send-path are serialised at the session layer.

### Added
- New canonical four-arg public constructor `NostrRelayClient(String relayUri, long awaitTimeoutMs, int sendBufferLimit, int sendTimeLimitMs)` annotated `@Autowired` for Spring constructor-injection. Spring binds against this overload via the new `@Value("${nostr.websocket.send-buffer-limit:262144}")` and `@Value("${nostr.websocket.send-time-limit-ms:10000}")` keys (also reachable via the `NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT` / `NOSTR_WEBSOCKET_SEND_TIME_LIMIT_MS` env-vars under Spring relaxed binding). The previously-existing one-arg `(String)` and two-arg `(String, long)` public constructors are retained as delegating overloads (binary-compatible) and now also benefit from the decorator wrap by way of the canonical ctor.
- Test-only static factories `forTestWithRawSession(...)` and `forTestWithDecoratedSession(...)` (two overloads — defaults and explicit) plus a package-private four-arg test constructor, so the new `NostrRelayClientConcurrencyTest` can assert both the regression (raw-session reproduction of the `IllegalStateException` race) and the resolution (decorator-wrapped session serialises sends).

### Changed
- `awaitTimeoutMs` is now a `final` field assigned once via constructor injection (previously `@Value`-annotated field). Spring's constructor-injection ordering guarantees the value reaches the constructor body before the decorator is constructed, which is required for the explicit overflow strategy to be applied. The system property / env-var key (`nostr.websocket.await-timeout-ms`) and its default (60 000 ms) are unchanged.

## [2.0.1] - 2026-05-06

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion nostr-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>xyz.tcheeric</groupId>
<artifactId>nostr-java</artifactId>
<version>2.0.1</version>
<version>2.0.3</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import org.springframework.web.socket.handler.SessionLimitExceededException;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import jakarta.websocket.ContainerProvider;
Expand Down Expand Up @@ -53,6 +56,26 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea
private static final int DEFAULT_MAX_TEXT_MESSAGE_BUFFER_SIZE = 1048576;
private static final int DEFAULT_MAX_BINARY_MESSAGE_BUFFER_SIZE = 1048576;
private static final int DEFAULT_MAX_EVENTS_PER_REQUEST = 10_000;
/**
* Default {@code ConcurrentWebSocketSessionDecorator} send-buffer size — 256 KiB.
*
* <p>Sized at ~2.5× the largest expected single payload (a chunked kind-37375
* EVENT can reach ~100 KiB) so that a large {@code send()} co-existing with
* a burst of subscribe REQs does not overflow the decorator's buffer. Tunable
* via {@code nostr.websocket.send-buffer-limit} ({@code @Value}) or
* {@code NOSTR_WEBSOCKET_SEND_BUFFER_LIMIT} (env-var, via Spring relaxed
* binding).
*/
private static final int DEFAULT_SEND_BUFFER_LIMIT = 256 * 1024;
/**
* Default {@code ConcurrentWebSocketSessionDecorator} per-send time limit — 10 s.
*
* <p>Above any p99.9 healthy-state subscribe REQ ({@literal <}100 ms) and well
* below {@link #DEFAULT_AWAIT_TIMEOUT_MS} so the writer-side timeout fires
* first if the underlying writer is genuinely stuck. Tunable via
* {@code nostr.websocket.send-time-limit-ms}.
*/
private static final int DEFAULT_SEND_TIME_LIMIT_MS = 10_000;
private static final ThreadFactory RELAY_IO_THREAD_FACTORY =
Thread.ofVirtual().name("nostr-relay-io-", 0).factory();
private static final ThreadFactory LISTENER_THREAD_FACTORY =
Expand All @@ -62,15 +85,28 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea
private static final Executor LISTENER_EXECUTOR =
command -> LISTENER_THREAD_FACTORY.newThread(command).start();

@Value("${nostr.websocket.await-timeout-ms:60000}")
private long awaitTimeoutMs;
private final long awaitTimeoutMs;

@Value("${nostr.websocket.max-idle-timeout-ms:3600000}")
private long maxIdleTimeoutMs;

@Value("${nostr.websocket.max-events-per-request:10000}")
private int maxEventsPerRequest = DEFAULT_MAX_EVENTS_PER_REQUEST;

/**
* Decorator buffer-size limit captured at construction. Exposed as a
* package-private field so unit tests can assert that constructor arguments
* are reflected here rather than overridden by static defaults (the
* field-injection regression assertion described in the spec).
*/
private final int sendBufferLimit;

/**
* Decorator per-send time-limit captured at construction. See
* {@link #sendBufferLimit} for the regression-assertion rationale.
*/
private final int sendTimeLimitMs;

private final WebSocketSession clientSession;
/**
* Relay URI captured at construction time. Used for logging so that
Expand Down Expand Up @@ -124,34 +160,130 @@ int eventCount() {
}
}

/**
* Back-compat constructor — delegates to the canonical four-arg constructor
* with default await-timeout / send-buffer-limit / send-time-limit. Retained
* for direct {@code new}-callers (e.g. the {@code connectAsync(String)}
* factory). Spring will <em>not</em> select this overload because the
* canonical four-arg constructor is annotated {@link Autowired}.
*/
public NostrRelayClient(@Value("${nostr.relay.uri}") String relayUri)
throws java.util.concurrent.ExecutionException, InterruptedException {
this.relayUri = relayUri;
this.clientSession = connectSession(relayUri);
connectionState.set(ConnectionState.CONNECTED);
this(relayUri, DEFAULT_AWAIT_TIMEOUT_MS, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS);
}

/**
* Back-compat constructor — delegates to the canonical four-arg constructor
* with default send-buffer-limit / send-time-limit. Retained for direct
* {@code new}-callers (e.g. the {@code connectAsync(String, long)} factory
* and the {@link NostrRelayClient(WebSocketSession, long)} test factory).
*/
public NostrRelayClient(String relayUri, long awaitTimeoutMs)
throws java.util.concurrent.ExecutionException, InterruptedException {
this(relayUri, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS);
}

/**
* Canonical constructor — the one Spring autowires against. Wraps the
* underlying {@link WebSocketSession} returned by {@link #connectSession}
* in a {@link ConcurrentWebSocketSessionDecorator} so concurrent
* {@code sendMessage()} calls from {@code subscribe()} and {@code send()}
* are serialised. Without the wrap, two threads racing inside
* {@code subscribe()} can trigger a Tomcat
* {@code IllegalStateException("The remote endpoint was in state
* [TEXT_FULL_WRITING]")} (or the Jetty / Undertow equivalent) which is
* caught by the existing {@code catch (RuntimeException)} block at the
* bottom of {@code subscribe()} and rewrapped as
* {@code IOException("Failed to send subscription payload", …)}.
*
* <p>The decorator is constructed with
* {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy#TERMINATE}
* (made explicit rather than relying on the default): if the buffer fills,
* the underlying session is closed, the calling thread sees a
* {@code SessionLimitExceededException}, and the next call lands on the
* reconnect path in the upstream {@code NostrJavaRelayClient} caller.
* {@code OverflowStrategy.DROP} is explicitly avoided because dropping
* outbound REQ / EVENT frames would silently make callers believe a
* subscribe / publish succeeded when the relay never received it.
*/
@Autowired
public NostrRelayClient(
@Value("${nostr.relay.uri}") String relayUri,
@Value("${nostr.websocket.await-timeout-ms:60000}") long awaitTimeoutMs,
@Value("${nostr.websocket.send-buffer-limit:262144}") int sendBufferLimit,
@Value("${nostr.websocket.send-time-limit-ms:10000}") int sendTimeLimitMs)
Comment on lines +210 to +214
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Document the new constructor and config properties

When this public four-argument constructor and the nostr.websocket.send-buffer-limit / nostr.websocket.send-time-limit-ms settings are added, the repo guideline to update documentation for modified features applies; however, the API reference still lists only the one- and two-argument constructors in docs/reference/nostr-java-api.md:174-178 and does not mention the new tuning properties. Consumers relying on the reference docs will miss the new configuration surface, so please update the relevant docs alongside this API change.

Useful? React with 👍 / 👎.

throws java.util.concurrent.ExecutionException, InterruptedException {
if (awaitTimeoutMs <= 0) {
throw new IllegalArgumentException("awaitTimeoutMs must be positive");
}
this.awaitTimeoutMs = awaitTimeoutMs;
if (sendBufferLimit <= 0) {
throw new IllegalArgumentException("sendBufferLimit must be positive");
}
if (sendTimeLimitMs <= 0) {
throw new IllegalArgumentException("sendTimeLimitMs must be positive");
}
this.relayUri = relayUri;
log.info("NostrRelayClient created for {} with awaitTimeoutMs={}", relayUri, awaitTimeoutMs);
this.clientSession = connectSession(relayUri);
this.awaitTimeoutMs = awaitTimeoutMs;
this.sendBufferLimit = sendBufferLimit;
this.sendTimeLimitMs = sendTimeLimitMs;
log.info(
"NostrRelayClient created for {} with awaitTimeoutMs={} sendBufferLimit={} sendTimeLimitMs={}",
relayUri, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs);
WebSocketSession raw = connectSession(relayUri);
this.clientSession =
new ConcurrentWebSocketSessionDecorator(
raw, sendTimeLimitMs, sendBufferLimit,
ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE);
connectionState.set(ConnectionState.CONNECTED);
}

/**
* Test-only constructor — the supplied {@link WebSocketSession} is stored
* <em>as-is</em>; no decorator wrap is applied. Production code paths must
* <em>not</em> reach this constructor — they go through the public
* {@code (String, …)} constructors that perform the wrap.
*
* <p>The §6.7a reproduction step (regression test for the concurrent-send
* race) needs a raw, unwrapped session to deterministically reproduce the
* {@code IllegalStateException}. Use
* {@link #forTestWithRawSession(WebSocketSession, long)} for that path. The
* §6.7b–d resolution steps wrap the session via
* {@link #forTestWithDecoratedSession(WebSocketSession, long)} and then
* reach this constructor with the wrapped session as the argument.
*/
NostrRelayClient(WebSocketSession clientSession, long awaitTimeoutMs) {
this(clientSession, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS);
}

/**
* Test-only constructor — four-arg form taking explicit decorator parameters
* so unit tests can assert that the constructor arguments are reflected in
* {@link #sendBufferLimit} and {@link #sendTimeLimitMs} (the field-injection
* regression assertion described in spec §4.1). The supplied session is
* stored as-is — wrapping is the caller's responsibility, see
* {@link #forTestWithDecoratedSession(WebSocketSession, long, int, int)}.
*/
NostrRelayClient(
WebSocketSession clientSession,
long awaitTimeoutMs,
int sendBufferLimit,
int sendTimeLimitMs) {
if (clientSession == null) {
throw new NullPointerException("clientSession must not be null");
}
if (awaitTimeoutMs <= 0) {
throw new IllegalArgumentException("awaitTimeoutMs must be positive");
}
if (sendBufferLimit <= 0) {
throw new IllegalArgumentException("sendBufferLimit must be positive");
}
if (sendTimeLimitMs <= 0) {
throw new IllegalArgumentException("sendTimeLimitMs must be positive");
}
this.clientSession = clientSession;
this.awaitTimeoutMs = awaitTimeoutMs;
this.sendBufferLimit = sendBufferLimit;
this.sendTimeLimitMs = sendTimeLimitMs;
URI sessionUri = null;
try {
sessionUri = clientSession.getUri();
Expand All @@ -163,6 +295,64 @@ public NostrRelayClient(String relayUri, long awaitTimeoutMs)
connectionState.set(ConnectionState.CONNECTED);
}

/**
* Test-only factory — REGRESSION reproduction path. Bypasses the decorator
* so the §6.7a reproduction can deterministically reproduce the
* concurrent-send {@code IllegalStateException} against a raw session.
* <em>Not</em> used by production code.
*/
static NostrRelayClient forTestWithRawSession(WebSocketSession session, long awaitTimeoutMs) {
return new NostrRelayClient(
session, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS);
}

/**
* Test-only factory — production path with default buffer / time-limit.
* Wraps the supplied session in a {@link ConcurrentWebSocketSessionDecorator}
* (using static defaults) and forwards through the package-private four-arg
* test ctor.
*/
static NostrRelayClient forTestWithDecoratedSession(
WebSocketSession session, long awaitTimeoutMs) {
return forTestWithDecoratedSession(
session, awaitTimeoutMs, DEFAULT_SEND_BUFFER_LIMIT, DEFAULT_SEND_TIME_LIMIT_MS);
}

/**
* Test-only factory — production path with custom buffer / time-limit. Used
* by the §6.7c (mixed-workload, default buffer) and §6.7d (overflow-and-close,
* small buffer) sub-cases.
*/
static NostrRelayClient forTestWithDecoratedSession(
WebSocketSession session,
long awaitTimeoutMs,
int sendBufferLimit,
int sendTimeLimitMs) {
WebSocketSession wrapped =
(session instanceof ConcurrentWebSocketSessionDecorator)
? session
: new ConcurrentWebSocketSessionDecorator(
session, sendTimeLimitMs, sendBufferLimit,
ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE);
return new NostrRelayClient(wrapped, awaitTimeoutMs, sendBufferLimit, sendTimeLimitMs);
}

/**
* Package-private accessor — exposes {@link #sendBufferLimit} for the
* field-injection regression unit test (§6.7b assertion (iv)).
*/
int sendBufferLimitForTest() {
return sendBufferLimit;
}

/**
* Package-private accessor — exposes {@link #sendTimeLimitMs} for the
* field-injection regression unit test (§6.7b assertion (iv)).
*/
int sendTimeLimitMsForTest() {
return sendTimeLimitMs;
}

/**
* Connect to a relay asynchronously on a Virtual Thread.
*
Expand Down Expand Up @@ -287,7 +477,21 @@ public List<String> send(String json) throws IOException {
request = new PendingRequest(maxEventsPerRequest);
pendingRequest = request;
log.info("Sending request to relay {}: {}", relayUri, json);
clientSession.sendMessage(new TextMessage(json));
try {
clientSession.sendMessage(new TextMessage(json));
} catch (SessionLimitExceededException e) {
// OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws;
// it does NOT close the delegate session. Close it explicitly here so
// upstream callers' isOpen()==false reconnect contract holds.
pendingRequest = null;
try {
clientSession.close(CloseStatus.SESSION_NOT_RELIABLE);
} catch (IOException closeEx) {
// Logged but not propagated — the original cause is the signal.
log.warn("Failed to close session after overflow: {}", closeEx.getMessage());
}
throw new IOException("Failed to send relay payload", e);
}
} finally {
sendLock.unlock();
}
Expand Down Expand Up @@ -397,6 +601,17 @@ public AutoCloseable subscribe(
throw e;
} catch (RuntimeException e) {
listeners.remove(listenerId);
// OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws;
// it does NOT close the delegate session. Close it explicitly here so
// upstream callers' isOpen()==false reconnect contract holds.
if (e instanceof SessionLimitExceededException) {
try {
clientSession.close(CloseStatus.SESSION_NOT_RELIABLE);
} catch (IOException closeEx) {
// Logged but not propagated — the original cause is the signal.
log.warn("Failed to close session after overflow: {}", closeEx.getMessage());
}
}
throw new IOException("Failed to send subscription payload", e);
}

Expand Down
Loading