From 09dc40370e0ca9b8d482dd187f58795de58880ec Mon Sep 17 00:00:00 2001 From: Mikita Hradovich Date: Thu, 7 May 2026 19:53:42 +0200 Subject: [PATCH] feat: optimize system table queries with column projection (DRIVER-368) On the first query to each system table (system.local, system.peers, system.peers_v2) the driver sends SELECT * to discover the server's schema. The result is intersected with the set of columns the driver actually reads and cached in SystemColumnProjection. Subsequent queries project only those columns, reducing bytes on the wire and deserialization work. Key design decisions: - SystemColumnProjection owns a SystemTable enum (LOCAL, PEERS, PEERS_V2) and three unified methods: query(SystemTable), populate(SystemTable, ResultSet), and hook(SystemTable, DefaultResultSetFuture). - populate() is called inside if (row != null) guards for WHERE-clause single-row lookups: an empty result still carries ColumnDefinitions in the metadata, so the cache must not be warmed from it. - hook() is used for the async system.peers full-scan path where the result always reflects the server schema regardless of row count. - Column caches are reset on reconnection and on InvalidQueryException so a server schema change causes the next query to re-discover columns via SELECT *. - Projected column lists are sorted alphabetically for deterministic query strings; ScassandraCluster primes matching projected queries alongside SELECT * primes. - Unit tests in ControlConnectionUnitTest cover intersectWithNeeded(), buildProjectedQuery(), hook() success/failure, and cache field modifiers. --- .../driver/core/ControlConnection.java | 98 ++++- .../driver/core/SystemColumnProjection.java | 297 ++++++++++++++ .../driver/core/ControlConnectionTest.java | 3 + .../core/ControlConnectionUnitTest.java | 378 ++++++++++++++++++ .../driver/core/ScassandraCluster.java | 113 +++++- 5 files changed, 867 insertions(+), 22 deletions(-) create mode 100644 driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java create mode 100644 driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index be8c7c88068..d72b82e2052 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -102,10 +102,22 @@ class ControlConnection implements Connection.Owner { // from here on out. private volatile boolean isPeersV2 = true; + private final SystemColumnProjection projection = new SystemColumnProjection(); + public ControlConnection(Cluster.Manager manager) { this.cluster = manager; } + /** + * Resets the projected-column caches so that the next query to each system table sends {@code + * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra + * primes between driver operations. + */ + @VisibleForTesting + void resetColumnCaches() { + projection.reset(); + } + // Only for the initial connection. Does not schedule retries if it fails void connect() throws UnsupportedProtocolVersionException { if (isShutdown) return; @@ -326,6 +338,12 @@ private Connection tryConnect(Host host, boolean isInitialConnection) ProtocolEvent.Type.SCHEMA_CHANGE); connection.write(new Requests.Register(evs)); + // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover + // which columns this server exposes, rather than a projected query built for a + // previous connection's server. The caches are populated during the queries below + // and remain warm for the lifetime of this connection. + projection.reset(); + // We need to refresh the node list first so we know about the cassandra version of // the node we're connecting to. // This will create the token map for the first time, but it will be incomplete @@ -453,6 +471,11 @@ void refreshNodeListAndTokenMap() { } catch (ExecutionException e) { // If we're being shutdown during refresh, this can happen. That's fine so don't scare the // user. + if (e.getCause() instanceof InvalidQueryException) { + // A projected query referenced a column the server no longer exposes; reset caches so + // the next connection re-discovers columns via SELECT *. + projection.reset(); + } if (!isShutdown) logger.error( "[Control connection] Unexpected error while refreshing node list and token map", e); @@ -489,28 +512,46 @@ private Row fetchNodeInfo(Host host, Connection c) InterruptedException { boolean isConnectedHost = c.endPoint.equals(host.getEndPoint()); if (isConnectedHost || host.getBroadcastSocketAddress() != null) { + SystemColumnProjection.SystemTable table = + isConnectedHost + ? SystemColumnProjection.SystemTable.LOCAL + : (isPeersV2 + ? SystemColumnProjection.SystemTable.PEERS_V2 + : SystemColumnProjection.SystemTable.PEERS); String query; if (isConnectedHost) { - query = SELECT_LOCAL; + query = projection.query(table); } else { InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress(); - query = - isPeersV2 - ? SELECT_PEERS_V2 - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "' AND peer_port=" - + broadcastAddress.getPort() - : SELECT_PEERS - + " WHERE peer='" - + broadcastAddress.getAddress().getHostAddress() - + "'"; + // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for + // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and + // every node has the projected full-scan prime registered. For WHERE lookups the control + // connection may query a node that was never restarted (and therefore still carries only + // the original SELECT * prime from init time), so projecting here risks a cache miss. + if (isPeersV2) { + String whereClause = + "peer='" + + broadcastAddress.getAddress().getHostAddress() + + "' AND peer_port=" + + broadcastAddress.getPort(); + query = SELECT_PEERS_V2 + " WHERE " + whereClause; + } else { + String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'"; + query = SELECT_PEERS + " WHERE " + whereClause; + } } DefaultResultSetFuture future = new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query)); c.write(future); - Row row = future.get().one(); + ResultSet rs = future.get(); + Row row = rs.one(); if (row != null) { + // Populate the column cache only when we got a real row. WHERE-clause lookups may return + // zero rows (e.g. broadcast address changed), in which case the ColumnDefinitions still + // exist in the result metadata but there is nothing useful to learn — we must not warm + // the cache from an empty result, or subsequent full-table scans will send a projected + // query that the server may not recognise. + projection.populate(table, rs); return row; } else { InetSocketAddress address = host.getBroadcastSocketAddress(); @@ -582,6 +623,11 @@ boolean refreshNodeInfo(Host host) { } catch (ExecutionException e) { // If we're being shutdown during refresh, this can happen. That's fine so don't scare the // user. + if (e.getCause() instanceof InvalidQueryException) { + // A projected query referenced a column the server no longer exposes; reset caches so + // the next connection re-discovers columns via SELECT *. + projection.reset(); + } if (!isShutdown) logger.debug("[Control connection] Unexpected error while refreshing node info", e); signalError(); @@ -719,7 +765,9 @@ private ListenableFuture selectPeersFuture(final Connection connectio if (isPeersV2) { DefaultResultSetFuture peersV2Future = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2)); + null, + cluster.protocolVersion(), + new Requests.Query(projection.query(SystemColumnProjection.SystemTable.PEERS_V2))); connection.write(peersV2Future); final SettableFuture peersFuture = SettableFuture.create(); // if peers v2 query fails, query peers table instead. @@ -729,6 +777,7 @@ private ListenableFuture selectPeersFuture(final Connection connectio @Override public void onSuccess(ResultSet result) { + projection.populate(SystemColumnProjection.SystemTable.PEERS_V2, result); peersFuture.set(result); } @@ -742,6 +791,9 @@ public void onFailure(Throwable t) { || (t instanceof ServerError && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) { isPeersV2 = false; + // Reset all caches: peersV2Columns is now stale, and peers cache should be cleared + // so the first system.peers query re-discovers columns via SELECT *. + projection.reset(); MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection)); } else { peersFuture.setException(t); @@ -751,11 +803,13 @@ public void onFailure(Throwable t) { MoreExecutors.directExecutor()); return peersFuture; } else { - DefaultResultSetFuture peersFuture = + DefaultResultSetFuture rawFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS)); - connection.write(peersFuture); - return peersFuture; + null, + cluster.protocolVersion(), + new Requests.Query(projection.query(SystemColumnProjection.SystemTable.PEERS))); + connection.write(rawFuture); + return projection.hook(SystemColumnProjection.SystemTable.PEERS, rawFuture); } } @@ -774,7 +828,9 @@ private void refreshNodeListAndTokenMap( DefaultResultSetFuture localFuture = new DefaultResultSetFuture( - null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)); + null, + cluster.protocolVersion(), + new Requests.Query(projection.query(SystemColumnProjection.SystemTable.LOCAL))); ListenableFuture peersFuture = selectPeersFuture(connection); connection.write(localFuture); @@ -783,7 +839,9 @@ private void refreshNodeListAndTokenMap( Map> tokenMap = new HashMap>(); // Update cluster name, DC and rack for the one node we are connected to - Row localRow = localFuture.get().one(); + ResultSet localRs = localFuture.get(); + projection.populate(SystemColumnProjection.SystemTable.LOCAL, localRs); + Row localRow = localRs.one(); if (localRow == null) { throw new IllegalStateException( String.format( diff --git a/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java new file mode 100644 index 00000000000..fcb9bd5f5a5 --- /dev/null +++ b/driver-core/src/main/java/com/datastax/driver/core/SystemColumnProjection.java @@ -0,0 +1,297 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2022 ScyllaDB + * + * Modified by ScyllaDB + */ +package com.datastax.driver.core; + +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Encapsulates the column-projection state and logic for {@link ControlConnection}'s system table + * queries (DRIVER-368). + * + *

On the first query to each system table ({@code system.local}, {@code system.peers}, {@code + * system.peers_v2}) the driver sends {@code SELECT *} to discover which columns the server exposes. + * The result is intersected with the appropriate {@code *_COLUMNS_OF_INTEREST} set and cached here. + * Subsequent queries project only the cached columns, reducing bytes on the wire and + * deserialization work. + * + *

All cache fields are {@code volatile} because they are written from the control-connection I/O + * thread and read from other threads. + */ +class SystemColumnProjection { + + /** + * Identifies one of the three system tables the driver queries. Used as a parameter to {@link + * #query}, {@link #populate}, and {@link #hook} so callers dispatch by value rather than by + * choosing among three separate methods. + */ + enum SystemTable { + LOCAL("system.local", "key='local'"), + PEERS("system.peers", null), + PEERS_V2("system.peers_v2", null); + + /** Fully-qualified table name used in query strings. */ + final String tableName; + + /** + * WHERE clause appended to full-scan queries, or {@code null} for tables that are always + * scanned in full. + */ + final String defaultWhereClause; + + SystemTable(String tableName, String defaultWhereClause) { + this.tableName = tableName; + this.defaultWhereClause = defaultWhereClause; + } + } + + // IMPORTANT: Every column read from system.local rows — in updateInfo(), + // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. + // If a new column read is added anywhere that consumes a system table row, add it to the + // appropriate set below, otherwise it will be silently excluded from projected queries. + @VisibleForTesting + static final ImmutableSet LOCAL_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "cluster_name", + "partitioner", + "data_center", + "rack", + "release_version", + "native_address", + "native_port", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "rpc_address", + "broadcast_address", + "broadcast_port", + "listen_address", + "listen_port", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely + "rpc_address", + "data_center", + "rack", + "release_version", + "tokens", + "listen_address", + "listen_port", + "host_id", + "schema_version", + "native_address", // may appear on some server variants; guarded by contains() in code + "native_port", // same + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "workload", + "graph", + "dse_version"); + + // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. + // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), + // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. + // Columns that are absent from the actual server schema are silently excluded by + // intersectWithNeeded(), so listing extra columns here is safe. + @VisibleForTesting + static final ImmutableSet PEERS_V2_COLUMNS_OF_INTEREST = + ImmutableSet.of( + "peer", + "peer_port", + "native_address", + "native_port", + "data_center", + "rack", + "release_version", + "tokens", + "host_id", + "schema_version", + "workload", + "graph", + "dse_version", + "listen_address", + "listen_port", + "rpc_address", // legacy; guarded by contains() in code — harmless if absent + "native_transport_address", // same + "native_transport_port", // same + "native_transport_port_ssl"); // same + + private volatile Set localColumns = null; + private volatile Set peersColumns = null; + private volatile Set peersV2Columns = null; + + /** + * Returns the full-scan query string for {@code table}: a projected {@code SELECT} if the cache + * is warm, otherwise {@code SELECT * FROM [WHERE ]}. + */ + String query(SystemTable table) { + Set cached = cachedColumns(table); + if (cached == null) { + String base = "SELECT * FROM " + table.tableName; + return table.defaultWhereClause != null ? base + " WHERE " + table.defaultWhereClause : base; + } + return buildProjectedQuery(table.tableName, cached, table.defaultWhereClause); + } + + /** + * Populates the column cache for {@code table} from the given result set, if not already + * populated. + */ + void populate(SystemTable table, ResultSet rs) { + if (cachedColumns(table) != null) return; + ImmutableSet needed = columnsOfInterest(table); + Set computed = intersectWithNeeded(rs, needed); + switch (table) { + case LOCAL: + if (localColumns == null) localColumns = computed; + break; + case PEERS: + if (peersColumns == null) peersColumns = computed; + break; + case PEERS_V2: + if (peersV2Columns == null) peersV2Columns = computed; + break; + } + } + + /** + * Attaches a callback to {@code future} that populates the column cache for {@code table} on + * success and resets all caches on {@link InvalidQueryException} failure. Returns the future + * unchanged so callers can chain it directly. + * + *

Use only for full-table scans. For single-row {@code WHERE} lookups, the result set + * may have zero rows while still carrying valid {@code ColumnDefinitions}; the callback would + * fire and warm the cache from an empty result, which is incorrect. Use {@link #populate} inside + * an {@code if (row != null)} guard for that path instead. + */ + ListenableFuture hook(final SystemTable table, DefaultResultSetFuture future) { + Futures.addCallback( + future, + new FutureCallback() { + @Override + public void onSuccess(ResultSet result) { + populate(table, result); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof InvalidQueryException) reset(); + } + }, + MoreExecutors.directExecutor()); + return future; + } + + /** + * Resets all column caches so that the next query to each system table sends {@code SELECT *} and + * re-discovers available columns. Called on reconnection and on schema errors. + */ + void reset() { + localColumns = null; + peersColumns = null; + peersV2Columns = null; + } + + /** Returns the cached column set for {@code table}, or {@code null} if not yet populated. */ + private Set cachedColumns(SystemTable table) { + switch (table) { + case LOCAL: + return localColumns; + case PEERS: + return peersColumns; + case PEERS_V2: + return peersV2Columns; + default: + throw new AssertionError("Unknown SystemTable: " + table); + } + } + + /** Returns the set of columns of interest for {@code table}. */ + private static ImmutableSet columnsOfInterest(SystemTable table) { + switch (table) { + case LOCAL: + return LOCAL_COLUMNS_OF_INTEREST; + case PEERS: + return PEERS_COLUMNS_OF_INTEREST; + case PEERS_V2: + return PEERS_V2_COLUMNS_OF_INTEREST; + default: + throw new AssertionError("Unknown SystemTable: " + table); + } + } + + /** + * Returns the intersection of the columns returned by the server (from {@code rs}) with the given + * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache + * projected column lists so subsequent queries fetch only what the driver actually reads. A + * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver + * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. + */ + @VisibleForTesting + static Set intersectWithNeeded(ResultSet rs, ImmutableSet needed) { + ImmutableSet.Builder result = ImmutableSet.builder(); + for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { + if (needed.contains(def.getName())) { + result.add(def.getName()); + } + } + ImmutableSet built = result.build(); + return built.isEmpty() ? null : built; + } + + /** + * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the + * given projected column set. Columns are sorted alphabetically so that the generated query + * string is deterministic regardless of the iteration order of {@code columns}. {@code + * whereClause} may be {@code null} for table-wide scans. + */ + @VisibleForTesting + static String buildProjectedQuery(String table, Set columns, String whereClause) { + List sorted = new ArrayList<>(columns); + Collections.sort(sorted); + String query = "SELECT " + String.join(", ", sorted) + " FROM " + table; + return whereClause != null ? query + " WHERE " + whereClause : query; + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java index 2670c2022de..f2d48352676 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java @@ -456,6 +456,9 @@ public void should_fetch_whole_peers_table_if_broadcast_address_changed() .build(); scassandras.node(1).primingClient().clearAllPrimes(); + // Reset the column caches so the driver re-discovers columns via SELECT * rather than + // sending projected queries against the now-cleared Scassandra primes. + cluster.manager.controlConnection.resetColumnCaches(); // the driver will attempt to locate host2 in system.peers by its old broadcast address, and // that will fail diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java new file mode 100644 index 00000000000..c23b9c81bf7 --- /dev/null +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionUnitTest.java @@ -0,0 +1,378 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Set; +import org.testng.annotations.Test; + +/** + * Pure unit tests for the column-projection helpers and caching fields in {@link + * SystemColumnProjection} (DRIVER-368). + * + *

These tests do not require a running Cassandra/Scylla node. For integration-level tests see + * {@link ControlConnectionTest}. + */ +public class ControlConnectionUnitTest { + + // --------------------------------------------------------------------------- + // *_COLUMNS_OF_INTEREST constants + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testLocalColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "cluster_name", + "tokens", + "host_id", + "native_address", + "dse_version", + "rpc_address", + "schema_version", + "data_center", + "rack", + "release_version", + "partitioner"); + } + + @Test(groups = "unit") + public void testLocalColumnsOfInterestSize() { + // 21 columns as documented in the constant declaration + assertThat(SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST).hasSize(21); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "rpc_address", + "tokens", + "native_address", + "native_port", + "native_transport_address", + "data_center", + "rack", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersColumnsOfInterestSize() { + // 19 columns: original 16 + peer_port, native_address, native_port + assertThat(SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestContainsExpectedColumns() { + ImmutableSet cols = SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST; + assertThat(cols) + .contains( + "peer", + "peer_port", + "native_address", + "native_port", + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl", + "data_center", + "rack", + "tokens", + "host_id", + "dse_version"); + } + + @Test(groups = "unit") + public void testPeersV2ColumnsOfInterestSize() { + // 19 columns: original 15 + rpc_address, native_transport_address/port/port_ssl + assertThat(SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST).hasSize(19); + } + + @Test(groups = "unit") + public void testPeersV2ContainsLegacyColumns() { + // rpc_address, native_transport_address/port/port_ssl are legacy columns the driver reads + // with contains() guards. They are included so they are not silently dropped if a server + // exposes them in peers_v2. + assertThat(SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST) + .contains( + "rpc_address", + "native_transport_address", + "native_transport_port", + "native_transport_port_ssl"); + } + + // --------------------------------------------------------------------------- + // intersectWithNeeded + // --------------------------------------------------------------------------- + + /** Helper: build a mock ResultSet whose column definitions contain exactly the given names. */ + private static ResultSet mockResultSetWithColumns(String... columnNames) { + ColumnDefinitions.Definition[] defs = new ColumnDefinitions.Definition[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + defs[i] = + new ColumnDefinitions.Definition("system", "local", columnNames[i], DataType.text()); + } + ColumnDefinitions colDefs = new ColumnDefinitions(defs, CodecRegistry.DEFAULT_INSTANCE); + + ResultSet rs = mock(ResultSet.class); + when(rs.getColumnDefinitions()).thenReturn(colDefs); + return rs; + } + + @Test(groups = "unit") + public void testIntersectWithNeededReturnsSupersetIntersection() { + // RS has all LOCAL columns plus some extras; result should be exactly LOCAL_COLUMNS_OF_INTEREST + ImmutableSet needed = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; + String[] base = needed.asList().toArray(new String[0]); + // Append two extra columns not in the interest set + String[] extended = java.util.Arrays.copyOf(base, base.length + 2); + extended[base.length] = "extra_col_1"; + extended[base.length + 1] = "extra_col_2"; + + ResultSet rs = mockResultSetWithColumns(extended); + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).isEqualTo(needed); + assertThat(result).doesNotContain("extra_col_1", "extra_col_2"); + } + + @Test(groups = "unit") + public void testIntersectWithNeededHandlesSubset() { + // RS only exposes a subset of the needed columns + ImmutableSet needed = + ImmutableSet.of("cluster_name", "tokens", "host_id", "schema_version"); + ResultSet rs = mockResultSetWithColumns("cluster_name", "tokens"); + + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).containsOnly("cluster_name", "tokens"); + assertThat(result).hasSize(2); + } + + @Test(groups = "unit") + public void testIntersectWithNeededNoOverlapReturnsNull() { + // When no server columns match the needed set, the result should be null so the cache remains + // in the uninitialized sentinel state (avoids generating an empty-column SELECT projection). + ImmutableSet needed = ImmutableSet.of("cluster_name", "tokens"); + ResultSet rs = mockResultSetWithColumns("some_other_col", "another_col"); + + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + @Test(groups = "unit") + public void testIntersectWithNeededEmptyResultSetReturnsNull() { + // An empty ResultSet has no column definitions, so the intersection is empty → null. + ImmutableSet needed = SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST; + ResultSet rs = mockResultSetWithColumns(); + + Set result = SystemColumnProjection.intersectWithNeeded(rs, needed); + + assertThat(result).isNull(); + } + + // --------------------------------------------------------------------------- + // buildProjectedQuery + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testBuildProjectedQueryWithWhereClause() { + Set columns = ImmutableSet.of("cluster_name", "host_id"); + String query = + SystemColumnProjection.buildProjectedQuery("system.local", columns, "key='local'"); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("cluster_name"); + assertThat(query).contains("host_id"); + assertThat(query).contains(" FROM system.local"); + assertThat(query).contains(" WHERE key='local'"); + // Should not contain SELECT * + assertThat(query).doesNotContain("*"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryWithoutWhereClause() { + Set columns = ImmutableSet.of("peer", "rpc_address", "tokens"); + String query = SystemColumnProjection.buildProjectedQuery("system.peers", columns, null); + + assertThat(query).startsWith("SELECT "); + assertThat(query).contains("peer"); + assertThat(query).contains("rpc_address"); + assertThat(query).contains("tokens"); + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + @Test(groups = "unit") + public void testBuildProjectedQuerySingleColumn() { + Set columns = ImmutableSet.of("host_id"); + String query = SystemColumnProjection.buildProjectedQuery("system.local", columns, null); + + assertThat(query).isEqualTo("SELECT host_id FROM system.local"); + } + + @Test(groups = "unit") + public void testBuildProjectedQueryAllColumnsPresent() { + // Every column in the needed set must appear as an exact identifier in the projected SELECT + // list. Use exact parsing to avoid false positives where one column name is a substring of + // another (e.g. "native_port" inside "native_transport_port"). + Set columns = SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST; + String query = SystemColumnProjection.buildProjectedQuery("system.peers", columns, null); + Set selectedColumns = extractSelectedColumns(query); + + for (String col : columns) { + assertThat(selectedColumns).as("query should project column: " + col).contains(col); + } + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + } + + /** + * Parses the column identifiers from the {@code SELECT col1, col2, ... FROM ...} portion of a + * projected query string and returns them as a set of trimmed names. + */ + private Set extractSelectedColumns(String query) { + int selectStart = query.indexOf("SELECT "); + int fromStart = query.indexOf(" FROM "); + assertThat(selectStart).as("query should start with SELECT").isEqualTo(0); + assertThat(fromStart).as("query should contain FROM").isGreaterThan(selectStart); + String columnList = query.substring("SELECT ".length(), fromStart); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String col : columnList.split(",")) { + builder.add(col.trim()); + } + return builder.build(); + } + + // --------------------------------------------------------------------------- + // Cache fields: declared as volatile, private, instance-level Set + // --------------------------------------------------------------------------- + + @Test(groups = "unit") + public void testCacheFieldsAreVolatilePrivateInstanceSets() throws Exception { + for (String fieldName : new String[] {"localColumns", "peersColumns", "peersV2Columns"}) { + Field field = SystemColumnProjection.class.getDeclaredField(fieldName); + int mods = field.getModifiers(); + + assertThat(Modifier.isVolatile(mods)).as(fieldName + " should be volatile").isTrue(); + assertThat(Modifier.isPrivate(mods)).as(fieldName + " should be private").isTrue(); + assertThat(Modifier.isStatic(mods)).as(fieldName + " must be an instance field").isFalse(); + assertThat(Set.class.isAssignableFrom(field.getType())) + .as(fieldName + " declared type should be Set") + .isTrue(); + } + } + + // --------------------------------------------------------------------------- + // hook: callback populates cache on success, resets all caches on failure + // --------------------------------------------------------------------------- + + /** + * A minimal subclass of {@link DefaultResultSetFuture} that exposes the protected {@code + * setException} method so tests can drive failure scenarios without a real connection. + */ + private static class SettableResultSetFuture extends DefaultResultSetFuture { + SettableResultSetFuture() { + super(null, ProtocolVersion.V4, new Requests.Query("SELECT * FROM system.peers")); + } + + void failWith(Exception e) { + setException(e); + } + } + + @Test(groups = "unit") + public void testHookPopulatesCacheOnSuccess() { + SystemColumnProjection projection = new SystemColumnProjection(); + // Cache is cold — query(PEERS) should return SELECT *. + assertThat(projection.query(SystemColumnProjection.SystemTable.PEERS)) + .isEqualTo("SELECT * FROM system.peers"); + + SettableResultSetFuture future = new SettableResultSetFuture(); + projection.hook(SystemColumnProjection.SystemTable.PEERS, future); + + // Complete the future with a result set that contains known peers columns. + ResultSet rs = + mockResultSetWithColumns("peer", "rpc_address", "host_id", "data_center", "tokens"); + future.setResult(rs); + + // Cache should now be warm; query(PEERS) must return a projected query, not SELECT *. + String query = projection.query(SystemColumnProjection.SystemTable.PEERS); + assertThat(query).doesNotContain("*"); + assertThat(query).startsWith("SELECT "); + assertThat(query).contains(" FROM system.peers"); + assertThat(query).doesNotContain("WHERE"); + // All columns from the mock RS that are in PEERS_COLUMNS_OF_INTEREST must be projected. + Set selected = extractSelectedColumns(query); + assertThat(selected).containsOnly("peer", "rpc_address", "host_id", "data_center", "tokens"); + } + + @Test(groups = "unit") + public void testHookResetsCacheOnInvalidQueryException() { + SystemColumnProjection projection = new SystemColumnProjection(); + // Warm the local and peers_v2 caches so we can verify reset() clears them too. + ResultSet localRs = mockResultSetWithColumns("cluster_name", "host_id", "tokens"); + projection.populate(SystemColumnProjection.SystemTable.LOCAL, localRs); + assertThat(projection.query(SystemColumnProjection.SystemTable.LOCAL)).doesNotContain("*"); + + SettableResultSetFuture future = new SettableResultSetFuture(); + projection.hook(SystemColumnProjection.SystemTable.PEERS, future); + + // Fail the future with an InvalidQueryException — hook must call reset(). + future.failWith(new InvalidQueryException(null, "Unknown column 'x'")); + + // All caches must be cleared: query(LOCAL) must return SELECT * again. + assertThat(projection.query(SystemColumnProjection.SystemTable.LOCAL)) + .isEqualTo("SELECT * FROM system.local WHERE key='local'"); + assertThat(projection.query(SystemColumnProjection.SystemTable.PEERS)) + .isEqualTo("SELECT * FROM system.peers"); + assertThat(projection.query(SystemColumnProjection.SystemTable.PEERS_V2)) + .isEqualTo("SELECT * FROM system.peers_v2"); + } + + @Test(groups = "unit") + public void testHookDoesNotResetCacheOnOtherFailure() { + SystemColumnProjection projection = new SystemColumnProjection(); + // Warm local cache. + ResultSet localRs = mockResultSetWithColumns("cluster_name", "host_id", "tokens"); + projection.populate(SystemColumnProjection.SystemTable.LOCAL, localRs); + String warmLocalQuery = projection.query(SystemColumnProjection.SystemTable.LOCAL); + assertThat(warmLocalQuery).doesNotContain("*"); + + SettableResultSetFuture future = new SettableResultSetFuture(); + projection.hook(SystemColumnProjection.SystemTable.PEERS, future); + + // Fail with a non-InvalidQueryException — reset() must NOT be called. + future.failWith(new RuntimeException("connection lost")); + + // Local cache must be untouched. + assertThat(projection.query(SystemColumnProjection.SystemTable.LOCAL)) + .isEqualTo(warmLocalQuery); + } +} diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 83078670d7e..341c9401e74 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -82,6 +82,13 @@ public class ScassandraCluster { private final boolean peersV2; + /** + * One stable UUID per node (keyed by 1-based nodeCount), computed once at construction so that + * system.local and system.peers rows for the same node always carry the same host_id regardless + * of which Scassandra process is being primed or how many times primeMetadata() is called. + */ + private final Map hostIdByNodeCount; + ScassandraCluster( Integer[] nodes, String ipPrefix, @@ -148,6 +155,17 @@ public class ScassandraCluster { instances = instanceListBuilder.build(); dcNodeMap = dcNodeMapBuilder.build(); + // Compute stable host_id UUIDs once so every primeMetadata() call uses the same values. + Map hostIds = new HashMap<>(); + int tempCount = 1; + for (Integer dc : new TreeSet(dcNodeMap.keySet())) { + for (int n = 0; n < dcNodeMap.get(dc).size(); n++) { + hostIds.put(tempCount, UUIDs.random()); + tempCount++; + } + } + this.hostIdByNodeCount = hostIds; + // Prime correct keyspace table based on C* version. String[] versionArray = this.cassandraVersion.split("\\.|-"); double major = Double.parseDouble(versionArray[0] + "." + versionArray[1]); @@ -357,6 +375,11 @@ public void start(Cluster cluster, int node) { logger.debug("Starting node {}.", node); Scassandra scassandra = node(node); scassandra.start(); + // Re-prime after restart: Scassandra loses all primes when its process restarts. + // Without re-priming, the driver may query an unprimed node (e.g. if the control + // connection temporarily reconnects to this host), get empty system table responses, + // and fail to bring the node back up within the allowed window. + primeMetadata(scassandra); assertThat(cluster).host(node).comesUpWithin(10, TimeUnit.SECONDS); } @@ -386,6 +409,7 @@ public List getTokensForDC(int dc) { private void primeMetadata(Scassandra node) { PrimingClient client = node.primingClient(); + int nodeCount = 1; ImmutableList.Builder> rows = ImmutableList.builder(); @@ -396,6 +420,7 @@ private void primeMetadata(Scassandra node) { for (int n = 0; n < nodesInDc.size(); n++) { InetSocketAddress binaryAddress = address(nodeCount); InetSocketAddress listenAddress = listenAddress(nodeCount); + java.util.UUID hostId = hostIdByNodeCount.get(nodeCount); nodeCount++; Scassandra peer = nodesInDc.get(n); if (node == peer) { // prime system.local. @@ -423,7 +448,7 @@ private void primeMetadata(Scassandra node) { "release_version", getPeerInfo(dc, n + 1, "release_version", cassandraVersion)); addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(tokens.get(n))); - addPeerInfo(row, dc, n + 1, "host_id", UUIDs.random()); + addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(row, dc, n + 1, "schema_version", schemaVersion); addPeerInfo(row, dc, n + 1, "graph", false); @@ -444,6 +469,19 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata( + SELECT_LOCAL, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } else { addPeerInfo(row, dc, n + 1, "broadcast_port", listenAddress.getPort()); addPeerInfo(row, dc, n + 1, "listen_port", listenAddress.getPort()); @@ -456,6 +494,20 @@ private void primeMetadata(Scassandra node) { .withRows(Collections.>singletonList(row)) .build()) .build()); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocalV2 = + projectedColumnMetadata( + SELECT_LOCAL_V2, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery( + projectedQueryString(projectedLocalV2, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocalV2) + .withRows(Collections.>singletonList(row)) + .build()) + .build()); } } else { // prime system.peers. Map row = Maps.newHashMap(); @@ -489,7 +541,6 @@ private void primeMetadata(Scassandra node) { addPeerInfo(row, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); addPeerInfo(rowV2, dc, n + 1, "tokens", ImmutableSet.of(Long.toString(tokens.get(n)))); - java.util.UUID hostId = UUIDs.random(); addPeerInfo(row, dc, n + 1, "host_id", hostId); addPeerInfo(rowV2, dc, n + 1, "host_id", hostId); @@ -546,6 +597,14 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers") .withThen(then().withColumnTypes(SELECT_PEERS).withRows(rows.build()).build()) .build()); + // Also prime the projected full-scan that the driver sends after the cache is warm. + ColumnMetadata[] projectedPeersFullScan = + projectedColumnMetadata(SELECT_PEERS, SystemColumnProjection.PEERS_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersFullScan, "system.peers", null)) + .withThen(then().withColumnTypes(projectedPeersFullScan).withRows(rows.build()).build()) + .build()); // return invalid error for peers_v2, indicating the table doesn't exist. if (!peersV2) { @@ -560,6 +619,16 @@ private void primeMetadata(Scassandra node) { .withQuery("SELECT * FROM system.peers_v2") .withThen(then().withColumnTypes(SELECT_PEERS_V2).withRows(rowsV2.build()).build()) .build()); + // Also prime the projected full-scan for peers_v2. + ColumnMetadata[] projectedPeersV2FullScan = + projectedColumnMetadata( + SELECT_PEERS_V2, SystemColumnProjection.PEERS_V2_COLUMNS_OF_INTEREST); + client.prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedPeersV2FullScan, "system.peers_v2", null)) + .withThen( + then().withColumnTypes(projectedPeersV2FullScan).withRows(rowsV2.build()).build()) + .build()); } // Needed to ensure cluster_name matches what we expect on connection. @@ -751,6 +820,34 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu column("validator", TEXT), }; + /** Returns the subset of {@code full} whose names are in {@code interest}, preserving order. */ + private static ColumnMetadata[] projectedColumnMetadata( + ColumnMetadata[] full, Set interest) { + List result = new ArrayList<>(); + for (ColumnMetadata col : full) { + if (interest.contains(col.getName())) result.add(col); + } + return result.toArray(new ColumnMetadata[0]); + } + + /** Builds a projected SELECT query string from a ColumnMetadata array. */ + private static String projectedQueryString( + ColumnMetadata[] cols, String table, String whereClause) { + // Sort alphabetically to match the order produced by + // SystemColumnProjection.buildProjectedQuery. + List names = new ArrayList<>(); + for (ColumnMetadata col : cols) names.add(col.getName()); + Collections.sort(names); + StringBuilder sb = new StringBuilder("SELECT "); + for (int i = 0; i < names.size(); i++) { + if (i > 0) sb.append(", "); + sb.append(names.get(i)); + } + sb.append(" FROM ").append(table); + if (whereClause != null) sb.append(" WHERE ").append(whereClause); + return sb.toString(); + } + // Primes a minimal system.local row on an Scassandra node. // We need a host_id so that the driver can store it in Metadata.hosts public static void primeSystemLocalRow(Scassandra scassandra) { @@ -767,6 +864,18 @@ public static void primeSystemLocalRow(Scassandra scassandra) { .withColumnTypes( localMetadata.toArray(new ColumnMetadata[localMetadata.size()])) .withRows(Collections.>singletonList(row)))); + // Also prime the projected query that the driver sends after the cache is warm. + ColumnMetadata[] projectedLocal = + projectedColumnMetadata(SELECT_LOCAL, SystemColumnProjection.LOCAL_COLUMNS_OF_INTEREST); + scassandra + .primingClient() + .prime( + PrimingRequest.queryBuilder() + .withQuery(projectedQueryString(projectedLocal, "system.local", "key='local'")) + .withThen( + then() + .withColumnTypes(projectedLocal) + .withRows(Collections.>singletonList(row)))); } public static ScassandraClusterBuilder builder() {