From fb6bdea16f22e910c62f7c0d0a49e1b088373fa3 Mon Sep 17 00:00:00 2001 From: "shenwen.yin" Date: Fri, 8 May 2026 18:15:01 +0800 Subject: [PATCH] poc: java sdk support multiple ip - introduce active passivce client - fix imcompatibile surefire plugin for testng 7.4.0 --- pom.xml | 4 +- .../smartx/tower/ActivePassiveApiClient.java | 584 ++++++++++++++++++ .../tower/ActivePassiveApiException.java | 206 ++++++ .../tower/ActivePassiveFailoverStrategy.java | 13 + .../tower/ActivePassiveRequestTags.java | 11 + src/main/java/com/smartx/tower/ApiClient.java | 58 ++ .../tower/ActivePassiveApiClientTest.java | 438 +++++++++++++ 7 files changed, 1312 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/smartx/tower/ActivePassiveApiClient.java create mode 100644 src/main/java/com/smartx/tower/ActivePassiveApiException.java create mode 100644 src/main/java/com/smartx/tower/ActivePassiveFailoverStrategy.java create mode 100644 src/main/java/com/smartx/tower/ActivePassiveRequestTags.java create mode 100644 src/test/java/com/smartx/tower/ActivePassiveApiClientTest.java diff --git a/pom.xml b/pom.xml index dc406f2f..4e349c4b 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ - 2.2.0 + 3.6.3 @@ -70,7 +70,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.5.5 diff --git a/src/main/java/com/smartx/tower/ActivePassiveApiClient.java b/src/main/java/com/smartx/tower/ActivePassiveApiClient.java new file mode 100644 index 00000000..cf447a16 --- /dev/null +++ b/src/main/java/com/smartx/tower/ActivePassiveApiClient.java @@ -0,0 +1,584 @@ +package com.smartx.tower; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Type; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.net.ssl.KeyManager; +import okhttp3.Call; +import okhttp3.HttpUrl; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.threeten.bp.format.DateTimeFormatter; + +/** ApiClient variant that routes CloudTower requests to the active endpoint in active/passive HA. */ +public class ActivePassiveApiClient extends ApiClient { + public static final int DEFAULT_PROBE_TIMEOUT_MILLIS = 10_000; + + private static final int HTTP_TEMPORARY_REDIRECT = 307; + + private final Object activeEndpointLock = new Object(); + private final List endpointBaseUrls = new ArrayList(); + private final ActivePassiveFailoverStrategy failoverStrategy; + private volatile int probeTimeoutMillis = DEFAULT_PROBE_TIMEOUT_MILLIS; + + private String currentActiveBaseUrl; + + public ActivePassiveApiClient(String... basePaths) throws ApiException { + this(ActivePassiveFailoverStrategy.DEFAULT, basePaths); + } + + public ActivePassiveApiClient(ActivePassiveFailoverStrategy failoverStrategy, String... basePaths) + throws ApiException { + super(); + this.failoverStrategy = + failoverStrategy == null ? ActivePassiveFailoverStrategy.DEFAULT : failoverStrategy; + setHttpClient(super.getHttpClient()); + setBasePaths(basePaths); + } + + public ActivePassiveApiClient(List basePaths) throws ApiException { + this(ActivePassiveFailoverStrategy.DEFAULT, basePaths); + } + + public ActivePassiveApiClient( + ActivePassiveFailoverStrategy failoverStrategy, List basePaths) + throws ApiException { + super(); + this.failoverStrategy = + failoverStrategy == null ? ActivePassiveFailoverStrategy.DEFAULT : failoverStrategy; + setHttpClient(super.getHttpClient()); + setBasePaths(basePaths); + } + + public List getBasePaths() { + synchronized (activeEndpointLock) { + return Collections.unmodifiableList(new ArrayList(endpointBaseUrls)); + } + } + + public ActivePassiveApiClient setBasePaths(String... basePaths) throws ApiException { + return setBasePaths(basePaths == null ? null : Arrays.asList(basePaths)); + } + + public ActivePassiveApiClient setBasePaths(List basePaths) throws ApiException { + EndpointBaseUrls parsed = parseEndpointBaseUrls(basePaths); + synchronized (activeEndpointLock) { + endpointBaseUrls.clear(); + endpointBaseUrls.addAll(parsed.baseUrls); + currentActiveBaseUrl = null; + super.setBasePath(endpointBaseUrls.get(0)); + } + return this; + } + + public int getProbeTimeoutMillis() { + return probeTimeoutMillis; + } + + public ActivePassiveApiClient setProbeTimeoutMillis(int probeTimeoutMillis) { + this.probeTimeoutMillis = + probeTimeoutMillis > 0 ? probeTimeoutMillis : DEFAULT_PROBE_TIMEOUT_MILLIS; + return this; + } + + public ActivePassiveFailoverStrategy getFailoverStrategy() { + return failoverStrategy; + } + + @Override + public ActivePassiveApiClient setHttpClient(OkHttpClient newHttpClient) { + super.setHttpClient(withActivePassiveInterceptor(newHttpClient)); + return this; + } + + @Override + public ApiClient setBasePath(String basePath) { + throw new UnsupportedOperationException( + "Use setBasePaths to configure ActivePassiveApiClient endpoints"); + } + + @Override + public ActivePassiveApiClient setVerifyingSsl(boolean verifyingSsl) { + super.setVerifyingSsl(verifyingSsl); + return this; + } + + @Override + public ActivePassiveApiClient setJSON(JSON json) { + super.setJSON(json); + return this; + } + + @Override + public ActivePassiveApiClient setSslCaCert(InputStream sslCaCert) { + super.setSslCaCert(sslCaCert); + return this; + } + + @Override + public ActivePassiveApiClient setKeyManagers(KeyManager[] managers) { + super.setKeyManagers(managers); + return this; + } + + @Override + public ActivePassiveApiClient setDateFormat(DateFormat dateFormat) { + super.setDateFormat(dateFormat); + return this; + } + + @Override + public ActivePassiveApiClient setSqlDateFormat(DateFormat dateFormat) { + super.setSqlDateFormat(dateFormat); + return this; + } + + @Override + public ActivePassiveApiClient setOffsetDateTimeFormat(DateTimeFormatter dateFormat) { + super.setOffsetDateTimeFormat(dateFormat); + return this; + } + + @Override + public ActivePassiveApiClient setLocalDateFormat(DateTimeFormatter dateFormat) { + super.setLocalDateFormat(dateFormat); + return this; + } + + @Override + public ActivePassiveApiClient setLenientOnJson(boolean lenientOnJson) { + super.setLenientOnJson(lenientOnJson); + return this; + } + + @Override + public ActivePassiveApiClient setUserAgent(String userAgent) { + super.setUserAgent(userAgent); + return this; + } + + @Override + public ActivePassiveApiClient addDefaultHeader(String key, String value) { + super.addDefaultHeader(key, value); + return this; + } + + @Override + public ActivePassiveApiClient removeDefaultHeader(String key, String value) { + super.removeDefaultHeader(key, value); + return this; + } + + @Override + public ActivePassiveApiClient addDefaultCookie(String key, String value) { + super.addDefaultCookie(key, value); + return this; + } + + @Override + public ActivePassiveApiClient setDebugging(boolean debugging) { + super.setDebugging(debugging); + return this; + } + + @Override + public ActivePassiveApiClient setTempFolderPath(String tempFolderPath) { + super.setTempFolderPath(tempFolderPath); + return this; + } + + @Override + public ActivePassiveApiClient setConnectTimeout(int connectionTimeout) { + super.setConnectTimeout(connectionTimeout); + return this; + } + + @Override + public ActivePassiveApiClient setReadTimeout(int readTimeout) { + super.setReadTimeout(readTimeout); + return this; + } + + @Override + public ActivePassiveApiClient setWriteTimeout(int writeTimeout) { + super.setWriteTimeout(writeTimeout); + return this; + } + + @Override + public ApiResponse execute(Call call, Type returnType) throws ApiException { + try { + return super.execute(call, returnType); + } catch (ApiException e) { + ApiException unwrapped = unwrapActivePassiveException(e); + if (unwrapped != null) { + throw unwrapped; + } + throw e; + } + } + + @Override + public void executeAsync( + Call call, final Type returnType, final ApiCallback callback) { + super.executeAsync( + call, + returnType, + new ApiCallback() { + @Override + public void onFailure( + ApiException e, int statusCode, Map> responseHeaders) { + ApiException unwrapped = unwrapActivePassiveException(e); + if (unwrapped != null) { + callback.onFailure( + unwrapped, + unwrapped.getCode(), + unwrapped.getResponseHeaders()); + return; + } + callback.onFailure(e, statusCode, responseHeaders); + } + + @Override + public void onSuccess( + T result, int statusCode, Map> responseHeaders) { + callback.onSuccess(result, statusCode, responseHeaders); + } + + @Override + public void onUploadProgress(long bytesWritten, long contentLength, boolean done) { + callback.onUploadProgress(bytesWritten, contentLength, done); + } + + @Override + public void onDownloadProgress(long bytesRead, long contentLength, boolean done) { + callback.onDownloadProgress(bytesRead, contentLength, done); + } + }); + } + + @Override + public Request buildRequest( + String baseUrl, + String path, + String method, + List queryParams, + List collectionQueryParams, + Object body, + Map headerParams, + Map cookieParams, + Map formParams, + String[] authNames, + ApiCallback callback) + throws ApiException { + if (baseUrl != null) { + throw ActivePassiveApiException.unsupportedDomainOverride(); + } + Request request = + super.buildRequest( + null, + path, + method, + queryParams, + collectionQueryParams, + body, + headerParams, + cookieParams, + formParams, + authNames, + callback); + return request; + } + + /** Probe the first configured endpoint and return whether it is currently active. */ + public boolean probeActivePassive() throws ApiException { + return probeActivePassive(firstEndpointBaseUrl(), probeTimeoutMillis); + } + + /** Returns the last discovered active endpoint base URL. */ + public String getCurrentActiveBaseUrl() { + synchronized (activeEndpointLock) { + return currentActiveBaseUrl; + } + } + + private Response interceptActivePassive(Interceptor.Chain chain) throws IOException { + Request request = chain.request(); + if (request.tag(ActivePassiveRequestTags.Bypass.class) != null) { + return chain.proceed(request); + } + + ActivePassiveFailoverStrategy strategy = failoverStrategy; + String baseUrl = activeBaseUrlForRequest(strategy); + Response response; + try { + response = chain.proceed(routeRequestForTransport(request, baseUrl)); + } catch (IOException e) { + clearCurrentActiveBaseUrlIf(baseUrl); + throw e; + } + + if (response.code() != HTTP_TEMPORARY_REDIRECT) { + return response; + } + + ActivePassiveApiException switchSignal = ActivePassiveApiException.switchSignal(response); + if (response.body() != null) { + response.body().close(); + } + clearCurrentActiveBaseUrlIf(baseUrl); + if (strategy == ActivePassiveFailoverStrategy.MANUAL_FAILOVER + || strategy == ActivePassiveFailoverStrategy.ALWAYS_PROBE) { + throw new ActivePassiveTransportException( + ActivePassiveApiException.failoverRequired(switchSignal)); + } + + String retryBaseUrl = activeBaseUrlForRequest(strategy); + Response retry; + try { + retry = chain.proceed(routeRequestForTransport(request, retryBaseUrl)); + } catch (IOException e) { + clearCurrentActiveBaseUrlIf(retryBaseUrl); + throw e; + } + + if (retry.code() != HTTP_TEMPORARY_REDIRECT) { + return retry; + } + + ActivePassiveApiException retrySignal = ActivePassiveApiException.switchSignal(retry); + if (retry.body() != null) { + retry.body().close(); + } + clearCurrentActiveBaseUrlIf(retryBaseUrl); + throw new ActivePassiveTransportException( + ActivePassiveApiException.retryExhausted(retrySignal)); + } + + private String activeBaseUrlForRequest(ActivePassiveFailoverStrategy strategy) throws IOException { + try { + if (strategy == ActivePassiveFailoverStrategy.ALWAYS_PROBE) { + clearCurrentActiveBaseUrl(); + } + return ensureActiveBaseUrl(); + } catch (ApiException e) { + throw new ActivePassiveTransportException(e); + } + } + + private String firstEndpointBaseUrl() { + synchronized (activeEndpointLock) { + return endpointBaseUrls.get(0); + } + } + + private String ensureActiveBaseUrl() throws ApiException { + while (true) { + List endpoints; + synchronized (activeEndpointLock) { + if (currentActiveBaseUrl != null) { + return currentActiveBaseUrl; + } + endpoints = new ArrayList(endpointBaseUrls); + } + + String activeBaseUrl = discoverActiveBaseUrl(endpoints); + synchronized (activeEndpointLock) { + if (currentActiveBaseUrl != null) { + return currentActiveBaseUrl; + } + if (endpointBaseUrls.equals(endpoints)) { + currentActiveBaseUrl = activeBaseUrl; + return activeBaseUrl; + } + } + } + } + + private String discoverActiveBaseUrl(List endpoints) throws ApiException { + List activeBaseUrls = new ArrayList(); + List failures = new ArrayList(); + + for (int i = 0; i < endpoints.size(); i++) { + String endpointBaseUrl = endpoints.get(i); + try { + if (probeActivePassive(endpointBaseUrl, probeTimeoutMillis)) { + activeBaseUrls.add(endpointBaseUrl); + } + } catch (ApiException e) { + failures.add(endpointBaseUrl + ": " + e.getMessage()); + } + } + + if (activeBaseUrls.size() == 1) { + return activeBaseUrls.get(0); + } + if (activeBaseUrls.isEmpty()) { + throw ActivePassiveApiException.noActiveEndpoint(failures); + } + throw ActivePassiveApiException.multipleActiveEndpoints(activeBaseUrls); + } + + private Request routeRequest(Request request, String baseUrl) throws ApiException { + HttpUrl targetBaseUrl = parseEndpointBaseUrl(baseUrl); + HttpUrl firstBaseUrl = parseEndpointBaseUrl(firstEndpointBaseUrl()); + String suffix = pathSuffix(request.url().encodedPath(), firstBaseUrl.encodedPath()); + HttpUrl routedUrl = + request.url() + .newBuilder() + .scheme(targetBaseUrl.scheme()) + .host(targetBaseUrl.host()) + .port(targetBaseUrl.port()) + .encodedPath(joinEncodedPath(targetBaseUrl.encodedPath(), suffix)) + .build(); + return request.newBuilder().url(routedUrl).build(); + } + + private Request routeRequestForTransport(Request request, String baseUrl) throws IOException { + try { + return routeRequest(request, baseUrl); + } catch (ApiException e) { + throw new ActivePassiveTransportException(e); + } + } + + private void clearCurrentActiveBaseUrl() { + synchronized (activeEndpointLock) { + currentActiveBaseUrl = null; + } + } + + private void clearCurrentActiveBaseUrlIf(String baseUrl) { + synchronized (activeEndpointLock) { + if (baseUrl != null && baseUrl.equals(currentActiveBaseUrl)) { + currentActiveBaseUrl = null; + } + } + } + + private EndpointBaseUrls parseEndpointBaseUrls(List basePaths) throws ApiException { + if (basePaths == null || basePaths.isEmpty()) { + throw ActivePassiveApiException.noEndpoints(); + } + + EndpointBaseUrls parsed = new EndpointBaseUrls(); + HashSet seen = new HashSet(); + for (String endpoint : basePaths) { + HttpUrl url = parseEndpointBaseUrl(endpoint); + String baseUrl = trimTrailingSlash(url.toString()); + if (!seen.add(baseUrl)) { + throw ActivePassiveApiException.duplicateEndpoint(baseUrl); + } + parsed.baseUrls.add(baseUrl); + } + return parsed; + } + + private HttpUrl parseEndpointBaseUrl(String endpoint) throws ApiException { + if (endpoint == null || endpoint.trim().isEmpty()) { + throw ActivePassiveApiException.noEndpoints(); + } + + String raw = endpoint.trim(); + HttpUrl parsed = HttpUrl.parse(raw); + if (parsed == null) { + throw ActivePassiveApiException.invalidEndpoint(endpoint); + } + return parsed; + } + + private OkHttpClient withActivePassiveInterceptor(OkHttpClient client) { + OkHttpClient safeClient = Objects.requireNonNull(client, "HttpClient must not be null!"); + OkHttpClient.Builder builder = + safeClient.newBuilder().followRedirects(false).followSslRedirects(false); + List interceptors = builder.interceptors(); + for (int i = interceptors.size() - 1; i >= 0; i--) { + if (interceptors.get(i) instanceof ActivePassiveInterceptor) { + interceptors.remove(i); + } + } + interceptors.add(0, new ActivePassiveInterceptor()); + return builder.build(); + } + + private static ApiException unwrapActivePassiveException(ApiException e) { + Throwable cause = e.getCause(); + if (cause instanceof ActivePassiveTransportException) { + return ((ActivePassiveTransportException) cause).getApiException(); + } + return null; + } + + private static String trimTrailingSlash(String value) { + HttpUrl url = HttpUrl.parse(value); + if (url == null) { + return value; + } + if ("/".equals(url.encodedPath())) { + return url.toString(); + } + String result = url.toString(); + while (result.endsWith("/")) { + result = result.substring(0, result.length() - 1); + } + return result; + } + + private static String pathSuffix(String path, String basePath) { + if ("/".equals(basePath)) { + return path; + } + if (path.equals(basePath)) { + return ""; + } + String basePathWithSlash = basePath.endsWith("/") ? basePath : basePath + "/"; + if (path.startsWith(basePathWithSlash)) { + return path.substring(basePath.length()); + } + return path; + } + + private static String joinEncodedPath(String basePath, String suffix) { + if (suffix == null || suffix.isEmpty()) { + return basePath; + } + if ("/".equals(basePath)) { + return suffix.startsWith("/") ? suffix : "/" + suffix; + } + return suffix.startsWith("/") ? basePath + suffix : basePath + "/" + suffix; + } + + private static class EndpointBaseUrls { + private final List baseUrls = new ArrayList(); + } + + private static class ActivePassiveTransportException extends IOException { + private final ApiException apiException; + + private ActivePassiveTransportException(ApiException apiException) { + super(apiException); + this.apiException = apiException; + } + + private ApiException getApiException() { + return apiException; + } + } + + private class ActivePassiveInterceptor implements Interceptor { + @Override + public Response intercept(Chain chain) throws IOException { + return interceptActivePassive(chain); + } + } +} diff --git a/src/main/java/com/smartx/tower/ActivePassiveApiException.java b/src/main/java/com/smartx/tower/ActivePassiveApiException.java new file mode 100644 index 00000000..6a5dde6d --- /dev/null +++ b/src/main/java/com/smartx/tower/ActivePassiveApiException.java @@ -0,0 +1,206 @@ +package com.smartx.tower; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import okhttp3.Response; + +/** Exception raised by active/passive discovery and routing logic. */ +@SuppressWarnings("serial") +public class ActivePassiveApiException extends ApiException { + public static final String CODE_NO_ENDPOINTS = "NO_ENDPOINTS"; + public static final String CODE_DUPLICATE_ENDPOINT = "DUPLICATE_ENDPOINT"; + public static final String CODE_NO_ACTIVE_ENDPOINT = "NO_ACTIVE_ENDPOINT"; + public static final String CODE_MULTIPLE_ACTIVE_ENDPOINTS = "MULTIPLE_ACTIVE_ENDPOINTS"; + public static final String CODE_INVALID_ENDPOINT = "INVALID_ENDPOINT"; + public static final String CODE_FAILOVER_REQUIRED = "FAILOVER_REQUIRED"; + public static final String CODE_RETRY_EXHAUSTED = "RETRY_EXHAUSTED"; + public static final String CODE_UNEXPECTED_PROBE_STATUS = "UNEXPECTED_PROBE_STATUS"; + public static final String CODE_UNSUPPORTED_DOMAIN_OVERRIDE = "UNSUPPORTED_DOMAIN_OVERRIDE"; + + private final String activePassiveCode; + private Map props; + private boolean sealed; + + public ActivePassiveApiException(String activePassiveCode, String message) { + super(message); + this.activePassiveCode = activePassiveCode; + seal(); + } + + private ActivePassiveApiException( + String activePassiveCode, + String message, + int code, + Map> responseHeaders, + String responseBody) { + super(message, code, responseHeaders, responseBody); + this.activePassiveCode = activePassiveCode; + } + + public String getActivePassiveCode() { + return activePassiveCode; + } + + public Map getProps() { + return props; + } + + static ActivePassiveApiException unsupportedDomainOverride() { + return new ActivePassiveApiException( + CODE_UNSUPPORTED_DOMAIN_OVERRIDE, + "ActivePassiveApiClient does not support per-request domain overrides"); + } + + static ActivePassiveApiException noEndpoints() { + return new ActivePassiveApiException( + CODE_NO_ENDPOINTS, + "active-passive client requires at least one endpoint"); + } + + static ActivePassiveApiException duplicateEndpoint(String baseUrl) { + return new ActivePassiveApiException( + CODE_DUPLICATE_ENDPOINT, + "active-passive client endpoints must be unique: " + baseUrl, + 0, + null, + null) + .prop("baseUrl", baseUrl) + .seal(); + } + + static ActivePassiveApiException invalidEndpoint(String endpoint) { + return new ActivePassiveApiException( + CODE_INVALID_ENDPOINT, + "Invalid active-passive endpoint: " + endpoint, + 0, + null, + null) + .prop("endpoint", endpoint) + .seal(); + } + + static ActivePassiveApiException noActiveEndpoint(List failures) { + String message = "active-passive discover found no active endpoint"; + String joinedFailures = null; + if (failures != null && !failures.isEmpty()) { + joinedFailures = String.join("; ", failures); + message = message + ": " + joinedFailures; + } + return new ActivePassiveApiException( + CODE_NO_ACTIVE_ENDPOINT, + message, + 0, + null, + null) + .prop("failures", joinedFailures) + .seal(); + } + + static ActivePassiveApiException multipleActiveEndpoints(List activeBaseUrls) { + String joinedActiveBaseUrls = String.join(", ", activeBaseUrls); + return new ActivePassiveApiException( + CODE_MULTIPLE_ACTIVE_ENDPOINTS, + "active-passive discover found multiple active endpoints: " + + joinedActiveBaseUrls, + 0, + null, + null) + .prop("activeBaseUrls", joinedActiveBaseUrls) + .seal(); + } + + static ActivePassiveApiException unexpectedProbeStatus(Response response) { + return fromResponse( + CODE_UNEXPECTED_PROBE_STATUS, + "probe active-passive returned unexpected status", + response); + } + + static ActivePassiveApiException switchSignal(Response response) { + return fromResponse(CODE_FAILOVER_REQUIRED, "active-passive switch signal", response); + } + + static ActivePassiveApiException failoverRequired(ApiException cause) { + return new ActivePassiveApiException( + CODE_FAILOVER_REQUIRED, + "active-passive failover required: " + cause.getMessage(), + cause.getCode(), + cause.getResponseHeaders(), + cause.getResponseBody()) + .seal(); + } + + static ActivePassiveApiException retryExhausted(ApiException cause) { + return new ActivePassiveApiException( + CODE_RETRY_EXHAUSTED, + "active-passive request retry exhausted after discover", + cause.getCode(), + cause.getResponseHeaders(), + cause.getResponseBody()) + .seal(); + } + + private static ActivePassiveApiException fromResponse( + String activePassiveCode, String message, Response response) { + ResponseSnapshot snapshot = snapshot(response); + return new ActivePassiveApiException( + activePassiveCode, + message, + snapshot.statusCode, + snapshot.headers, + snapshot.body) + .seal(); + } + + private static ResponseSnapshot snapshot(Response response) { + String body = null; + if (response.body() != null) { + try { + body = response.body().string(); + } catch (IOException ignored) { + body = null; + } + } + return new ResponseSnapshot(response.code(), response.headers().toMultimap(), body); + } + + private ActivePassiveApiException prop(String key, String value) { + if (sealed) { + throw new IllegalStateException("props have already been sealed"); + } + if (value != null) { + if (props == null) { + props = new HashMap(); + } + props.put(key, value); + } + return this; + } + + private ActivePassiveApiException seal() { + if (sealed) { + return this; + } + if (props == null) { + props = Collections.emptyMap(); + } + props = Collections.unmodifiableMap(props); + sealed = true; + return this; + } + + private static class ResponseSnapshot { + private final int statusCode; + private final Map> headers; + private final String body; + + private ResponseSnapshot(int statusCode, Map> headers, String body) { + this.statusCode = statusCode; + this.headers = headers; + this.body = body; + } + } +} diff --git a/src/main/java/com/smartx/tower/ActivePassiveFailoverStrategy.java b/src/main/java/com/smartx/tower/ActivePassiveFailoverStrategy.java new file mode 100644 index 00000000..33b03d5c --- /dev/null +++ b/src/main/java/com/smartx/tower/ActivePassiveFailoverStrategy.java @@ -0,0 +1,13 @@ +package com.smartx.tower; + +/** Controls how {@link ActivePassiveApiClient} discovers and retries endpoints. */ +public enum ActivePassiveFailoverStrategy { + /** Cache the discovered active endpoint and retry once after a 307 switch signal. */ + DEFAULT, + + /** Return a failover-required error after a 307 switch signal without rediscovery. */ + MANUAL_FAILOVER, + + /** Probe before every request and return a failover-required error after a fresh 307. */ + ALWAYS_PROBE +} diff --git a/src/main/java/com/smartx/tower/ActivePassiveRequestTags.java b/src/main/java/com/smartx/tower/ActivePassiveRequestTags.java new file mode 100644 index 00000000..364ad3ab --- /dev/null +++ b/src/main/java/com/smartx/tower/ActivePassiveRequestTags.java @@ -0,0 +1,11 @@ +package com.smartx.tower; + +final class ActivePassiveRequestTags { + static final Bypass BYPASS = new Bypass(); + + private ActivePassiveRequestTags() {} + + static final class Bypass { + private Bypass() {} + } +} diff --git a/src/main/java/com/smartx/tower/ApiClient.java b/src/main/java/com/smartx/tower/ApiClient.java index 04697fcf..68324942 100644 --- a/src/main/java/com/smartx/tower/ApiClient.java +++ b/src/main/java/com/smartx/tower/ApiClient.java @@ -53,6 +53,8 @@ /** ApiClient class. */ public class ApiClient { + private static final int DEFAULT_ACTIVE_PASSIVE_PROBE_TIMEOUT_MILLIS = 10_000; + private static final int HTTP_TEMPORARY_REDIRECT = 307; private String basePath = "http://localhost"; private boolean debugging = false; @@ -1080,6 +1082,62 @@ public T handleResponse(Response response, Type returnType) throws ApiExcept } } + /** + * Probe this client's current base path and return whether it is the active endpoint. + * + *

The probe sends GET /api/healthz on the endpoint root. HTTP 200 means active, HTTP 307 + * means passive, and any other status is treated as an error. + */ + public boolean probeActivePassive() throws ApiException { + return probeActivePassive(basePath, DEFAULT_ACTIVE_PASSIVE_PROBE_TIMEOUT_MILLIS); + } + + protected boolean probeActivePassive(String basePath, int timeoutMillis) throws ApiException { + HttpUrl endpoint = HttpUrl.parse(basePath); + if (endpoint == null) { + throw new ApiException("Invalid base path: " + basePath); + } + + HttpUrl probeUrl = + endpoint.newBuilder().encodedPath("/api/healthz").query(null).fragment(null).build(); + Request request = + new Request.Builder() + .url(probeUrl) + .tag(ActivePassiveRequestTags.Bypass.class, ActivePassiveRequestTags.BYPASS) + .get() + .build(); + OkHttpClient probeClient = + httpClient + .newBuilder() + .followRedirects(false) + .followSslRedirects(false) + .callTimeout( + timeoutMillis > 0 + ? timeoutMillis + : DEFAULT_ACTIVE_PASSIVE_PROBE_TIMEOUT_MILLIS, + TimeUnit.MILLISECONDS) + .build(); + + try { + Response response = probeClient.newCall(request).execute(); + try { + if (response.code() == 200) { + return true; + } + if (response.code() == HTTP_TEMPORARY_REDIRECT) { + return false; + } + throw ActivePassiveApiException.unexpectedProbeStatus(response); + } finally { + if (response.body() != null) { + response.body().close(); + } + } + } catch (IOException e) { + throw new ApiException(e); + } + } + /** * Build HTTP call with the given options. * diff --git a/src/test/java/com/smartx/tower/ActivePassiveApiClientTest.java b/src/test/java/com/smartx/tower/ActivePassiveApiClientTest.java new file mode 100644 index 00000000..e0e0a7c8 --- /dev/null +++ b/src/test/java/com/smartx/tower/ActivePassiveApiClientTest.java @@ -0,0 +1,438 @@ +package com.smartx.tower; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import okhttp3.Call; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +public class ActivePassiveApiClientTest { + private final ThreadLocal> servers = + new ThreadLocal>() { + @Override + protected List initialValue() { + return new ArrayList(); + } + }; + + @AfterMethod + public void stopServers() { + List currentServers = servers.get(); + for (HttpServer server : currentServers) { + server.stop(0); + } + currentServers.clear(); + servers.remove(); + } + + @Test + public void defaultStrategyDiscoversAndCachesActiveEndpoint() throws Exception { + CountingHandler passiveHandler = new CountingHandler(307, 500, "passive"); + String passive = startServer(passiveHandler) + "/v2/api"; + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(passive, active); + + ApiResponse first = client.execute(testCall(client), String.class); + ApiResponse second = client.execute(testCall(client), String.class); + + assertThat(first.getData()).isEqualTo("active"); + assertThat(second.getData()).isEqualTo("active"); + assertThat(client.getCurrentActiveBaseUrl()).isEqualTo(active); + assertThat(passiveHandler.probeCount.get()).isEqualTo(1); + assertThat(activeHandler.probeCount.get()).isEqualTo(1); + assertThat(passiveHandler.requestCount.get()).isZero(); + assertThat(activeHandler.requestCount.get()).isEqualTo(2); + } + + @Test + public void defaultStrategyReprobesAndRetriesOnceOn307() throws Exception { + AtomicBoolean switched = new AtomicBoolean(false); + String stale = + startServer( + new HttpHandler() { + @Override + public void handle(HttpExchange exchange) throws IOException { + if ("/api/healthz".equals(exchange.getRequestURI().getPath())) { + write(exchange, switched.get() ? 307 : 200, ""); + return; + } + if ("/v2/api/test".equals(exchange.getRequestURI().getPath())) { + switched.set(true); + exchange.getResponseHeaders().set("Location", "/ignored"); + write(exchange, 307, ""); + return; + } + write(exchange, 404, ""); + } + }) + + "/v2/api"; + CountingHandler activeHandler = new CountingHandler(307, 200, "active") { + @Override + int healthzStatus() { + return switched.get() ? 200 : 307; + } + }; + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(stale, active); + + ApiResponse response = client.execute(testCall(client), String.class); + + assertThat(response.getData()).isEqualTo("active"); + assertThat(client.getCurrentActiveBaseUrl()).isEqualTo(active); + assertThat(activeHandler.requestCount.get()).isEqualTo(1); + } + + @Test + public void activePassiveInterceptorRunsBeforeUserInterceptorsOnRetry() throws Exception { + AtomicBoolean switched = new AtomicBoolean(false); + String stale = + startServer( + new HttpHandler() { + @Override + public void handle(HttpExchange exchange) throws IOException { + if ("/api/healthz".equals(exchange.getRequestURI().getPath())) { + write(exchange, switched.get() ? 307 : 200, ""); + return; + } + if ("/v2/api/test".equals(exchange.getRequestURI().getPath())) { + switched.set(true); + write(exchange, 307, ""); + return; + } + write(exchange, 404, ""); + } + }) + + "/v2/api"; + CountingHandler activeHandler = new CountingHandler(307, 200, "active") { + @Override + int healthzStatus() { + return switched.get() ? 200 : 307; + } + }; + String active = startServer(activeHandler) + "/v2/api"; + final List seenRequestEndpoints = new ArrayList(); + OkHttpClient httpClient = + new OkHttpClient.Builder() + .addInterceptor( + new Interceptor() { + @Override + public Response intercept(Chain chain) throws IOException { + if ("/v2/api/test" + .equals(chain.request().url().encodedPath())) { + seenRequestEndpoints.add(endpointOf(chain.request().url())); + } + return chain.proceed(chain.request()); + } + }) + .build(); + ActivePassiveApiClient client = new ActivePassiveApiClient(stale, active); + client.setHttpClient(httpClient); + + ApiResponse response = client.execute(testCall(client), String.class); + + assertThat(response.getData()).isEqualTo("active"); + assertThat(seenRequestEndpoints).containsExactly(endpointOf(stale), endpointOf(active)); + } + + @Test + public void manualFailoverStrategyReturnsSignalWithoutRediscoveryOn307() throws Exception { + CountingHandler staleHandler = new CountingHandler(200, 307, ""); + String stale = startServer(staleHandler) + "/v2/api"; + CountingHandler activeHandler = new CountingHandler(307, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = + new ActivePassiveApiClient( + ActivePassiveFailoverStrategy.MANUAL_FAILOVER, stale, active); + + assertThatExceptionOfType(ActivePassiveApiException.class) + .isThrownBy(() -> client.execute(testCall(client), String.class)) + .satisfies( + e -> { + assertThat(e.getActivePassiveCode()) + .isEqualTo(ActivePassiveApiException.CODE_FAILOVER_REQUIRED); + }); + assertThat(client.getCurrentActiveBaseUrl()).isNull(); + assertThat(staleHandler.probeCount.get()).isEqualTo(1); + assertThat(staleHandler.requestCount.get()).isEqualTo(1); + assertThat(activeHandler.probeCount.get()).isEqualTo(1); + assertThat(activeHandler.requestCount.get()).isZero(); + } + + @Test + public void alwaysProbeStrategyProbesBeforeEveryRequest() throws Exception { + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = + new ActivePassiveApiClient(ActivePassiveFailoverStrategy.ALWAYS_PROBE, active); + + client.execute(testCall(client), String.class); + client.execute(testCall(client), String.class); + + assertThat(activeHandler.probeCount.get()).isEqualTo(2); + assertThat(activeHandler.requestCount.get()).isEqualTo(2); + } + + @Test + public void probeActivePassiveUsesRootHealthzAndDoesNotFollowRedirects() throws Exception { + CountingHandler passiveHandler = new CountingHandler(307, 200, "unexpected"); + String passive = startServer(passiveHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(passive); + + boolean active = client.probeActivePassive(); + + assertThat(active).isFalse(); + assertThat(passiveHandler.probeCount.get()).isEqualTo(1); + assertThat(passiveHandler.requestCount.get()).isZero(); + } + + @Test + public void singleEndpointApiClientCanProbeWhetherItsBasePathIsActive() throws Exception { + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + CountingHandler passiveHandler = new CountingHandler(307, 200, "passive"); + String passive = startServer(passiveHandler) + "/v2/api"; + + assertThat(new ApiClient().setBasePath(active).probeActivePassive()).isTrue(); + assertThat(new ApiClient().setBasePath(passive).probeActivePassive()).isFalse(); + assertThat(activeHandler.probeCount.get()).isEqualTo(1); + assertThat(passiveHandler.probeCount.get()).isEqualTo(1); + assertThat(activeHandler.requestCount.get()).isZero(); + assertThat(passiveHandler.requestCount.get()).isZero(); + } + + @Test + public void setBasePathIsNotSupportedAfterConstruction() throws Exception { + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(active); + + assertThatExceptionOfType(UnsupportedOperationException.class) + .isThrownBy(() -> client.setBasePath(active)); + } + + @Test + public void perRequestBaseUrlOverrideIsNotSupported() throws Exception { + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(active); + + assertThatExceptionOfType(ApiException.class) + .isThrownBy( + () -> + client.buildCall( + "http://override.example.com/v2/api", + "/test", + "GET", + Collections.emptyList(), + Collections.emptyList(), + null, + new HashMap(), + new HashMap(), + new HashMap(), + new String[] {}, + null)) + .satisfies( + e -> { + assertThat(e) + .isInstanceOf(ActivePassiveApiException.class) + .hasMessageContaining("per-request domain overrides"); + ActivePassiveApiException activePassiveError = + (ActivePassiveApiException) e; + assertThat(activePassiveError.getActivePassiveCode()) + .isEqualTo( + ActivePassiveApiException + .CODE_UNSUPPORTED_DOMAIN_OVERRIDE); + }); + } + + @Test + public void setBasePathsReplacesEndpointsAndClearsCurrentActiveEndpoint() throws Exception { + CountingHandler oldActiveHandler = new CountingHandler(200, 200, "old"); + String oldActive = startServer(oldActiveHandler) + "/v2/api"; + CountingHandler newActiveHandler = new CountingHandler(200, 200, "new"); + String newActive = startServer(newActiveHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(oldActive); + + assertThat(client.execute(testCall(client), String.class).getData()).isEqualTo("old"); + assertThat(client.getCurrentActiveBaseUrl()).isEqualTo(oldActive); + + client.setBasePaths(newActive); + + assertThat(client.getBasePaths()).containsExactly(newActive); + assertThat(client.getCurrentActiveBaseUrl()).isNull(); + assertThat(client.execute(testCall(client), String.class).getData()).isEqualTo("new"); + assertThat(client.getCurrentActiveBaseUrl()).isEqualTo(newActive); + } + + @Test + public void activePassivePropertiesHaveJavaBeanAccessors() throws Exception { + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = + new ActivePassiveApiClient(ActivePassiveFailoverStrategy.ALWAYS_PROBE, active) + .setProbeTimeoutMillis(2345); + + assertThat(client.getBasePaths()).containsExactly(active); + assertThat(client.getProbeTimeoutMillis()).isEqualTo(2345); + assertThat(client.getFailoverStrategy()) + .isEqualTo(ActivePassiveFailoverStrategy.ALWAYS_PROBE); + assertThat(client.getCurrentActiveBaseUrl()).isNull(); + } + + @Test + public void setHttpClientRejectsNullLikeBaseApiClient() throws Exception { + CountingHandler activeHandler = new CountingHandler(200, 200, "active"); + String active = startServer(activeHandler) + "/v2/api"; + ActivePassiveApiClient client = new ActivePassiveApiClient(active); + + assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(() -> client.setHttpClient((OkHttpClient) null)) + .withMessage("HttpClient must not be null!"); + } + + @Test + public void activePassiveExceptionPropsAreSealed() { + List failures = new ArrayList(); + failures.add("first"); + + ActivePassiveApiException exception = ActivePassiveApiException.noActiveEndpoint(failures); + + assertThat(exception.getProps()).containsEntry("failures", "first"); + assertThatExceptionOfType(UnsupportedOperationException.class) + .isThrownBy(() -> exception.getProps().put("new", "value")); + } + + @Test + public void activePassiveExceptionPropsAreSealedWhenEmpty() { + ActivePassiveApiException exception = + new ActivePassiveApiException("TEST", "test message"); + + assertThat(exception.getProps()).isEmpty(); + assertThatExceptionOfType(UnsupportedOperationException.class) + .isThrownBy(() -> exception.getProps().put("new", "value")); + } + + @Test + public void activePassiveExceptionSealIsIdempotentAndRejectsLaterProps() throws Exception { + ActivePassiveApiException exception = + new ActivePassiveApiException("TEST", "test message"); + Method seal = ActivePassiveApiException.class.getDeclaredMethod("seal"); + Method prop = + ActivePassiveApiException.class.getDeclaredMethod( + "prop", String.class, String.class); + seal.setAccessible(true); + prop.setAccessible(true); + + seal.invoke(exception); + + assertThatExceptionOfType(InvocationTargetException.class) + .isThrownBy(() -> prop.invoke(exception, "key", "value")) + .satisfies( + e -> + assertThat(e.getCause()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("props have already been sealed")); + } + + private Call testCall(ApiClient client) throws ApiException { + HashMap headers = new HashMap(); + headers.put("Accept", "text/plain"); + return client.buildCall( + null, + "/test", + "GET", + Collections.emptyList(), + Collections.emptyList(), + null, + headers, + new HashMap(), + new HashMap(), + new String[] {}, + null); + } + + private String startServer(HttpHandler handler) throws IOException { + HttpServer server = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0); + server.createContext("/", handler); + server.start(); + servers.get().add(server); + return "http://127.0.0.1:" + server.getAddress().getPort(); + } + + private static String endpointOf(String baseUrl) { + return endpointOf(okhttp3.HttpUrl.parse(baseUrl)); + } + + private static String endpointOf(okhttp3.HttpUrl url) { + return url.host() + ":" + url.port(); + } + + private static void write(HttpExchange exchange, int status, String body) throws IOException { + byte[] bytes = body.getBytes(StandardCharsets.UTF_8); + if (!body.isEmpty()) { + exchange.getResponseHeaders().set("Content-Type", "text/plain"); + } + exchange.sendResponseHeaders(status, bytes.length); + OutputStream output = exchange.getResponseBody(); + try { + output.write(bytes); + } finally { + output.close(); + } + } + + private static class CountingHandler implements HttpHandler { + private final int healthzStatus; + private final int requestStatus; + private final String responseBody; + private final AtomicInteger probeCount = new AtomicInteger(); + private final AtomicInteger requestCount = new AtomicInteger(); + + private CountingHandler(int healthzStatus, int requestStatus, String responseBody) { + this.healthzStatus = healthzStatus; + this.requestStatus = requestStatus; + this.responseBody = responseBody; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + String path = exchange.getRequestURI().getPath(); + if ("/api/healthz".equals(path)) { + probeCount.incrementAndGet(); + write(exchange, healthzStatus(), ""); + return; + } + if ("/v2/api/test".equals(path)) { + requestCount.incrementAndGet(); + write(exchange, requestStatus, responseBody); + return; + } + write(exchange, 404, ""); + } + + int healthzStatus() { + return healthzStatus; + } + } +}