This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 273975ef26 Prioritize built indexes in IndexStatusManager
273975ef26 is described below
commit 273975ef26e5de1a10a98c0df364cb67fa7830e3
Author: Arun Ganesh <[email protected]>
AuthorDate: Tue Jul 23 20:55:13 2024 -0700
Prioritize built indexes in IndexStatusManager
Patch by Arun Ganesh; reviewed by Caleb Rackliffe and Zhao Yang for
CASSANDRA-19400
---
CHANGES.txt | 1 +
.../apache/cassandra/index/IndexStatusManager.java | 16 +
.../cassandra/index/internal/CassandraIndex.java | 7 +
.../cassandra/index/IndexStatusManagerTest.java | 450 +++++++++++++++++++++
4 files changed, 474 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index 48ef4700da..fbb989e13b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.1
+ * Prioritize built indexes in IndexStatusManager (CASSANDRA-19400)
* Add java.base/java.lang.reflect among opens for jvm11-client.options
(CASSANDRA-19780)
diff --git a/src/java/org/apache/cassandra/index/IndexStatusManager.java
b/src/java/org/apache/cassandra/index/IndexStatusManager.java
index 13a51831f9..1c0f5887db 100644
--- a/src/java/org/apache/cassandra/index/IndexStatusManager.java
+++ b/src/java/org/apache/cassandra/index/IndexStatusManager.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.index;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -84,18 +86,32 @@ public class IndexStatusManager
*/
public <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspace
keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel level)
{
+ // UNKNOWN states are transient/rare; only a few replicas should have
this state at any time. See CASSANDRA-19400
+ Set<Replica> queryableNonSucceeded = new HashSet<>(4);
+
E queryableEndpoints = liveEndpoints.filter(replica -> {
+ boolean allBuilt = true;
for (Index index : indexQueryPlan.getIndexes())
{
Index.Status status = getIndexStatus(replica.endpoint(),
keyspace.getName(), index.getIndexMetadata().name);
if (!index.isQueryable(status))
return false;
+
+ if (status != Index.Status.BUILD_SUCCEEDED)
+ allBuilt = false;
}
+ if (!allBuilt)
+ queryableNonSucceeded.add(replica);
+
return true;
});
+ // deprioritize replicas with queryable but non-succeeded indexes
+ if (!queryableNonSucceeded.isEmpty() && queryableNonSucceeded.size()
!= queryableEndpoints.size())
+ queryableEndpoints =
queryableEndpoints.sorted(Comparator.comparingInt(e ->
queryableNonSucceeded.contains(e) ? 1 : -1));
+
int initial = liveEndpoints.size();
int filtered = queryableEndpoints.size();
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 1f39ea502b..20c1a05328 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -207,6 +207,13 @@ public abstract class CassandraIndex implements Index
};
}
+ @Override
+ public boolean isQueryable(Status status)
+ {
+ // consider unknown status as queryable, because gossip may not be
up-to-date for newly joining nodes.
+ return status == Status.BUILD_SUCCEEDED || status == Status.UNKNOWN ||
status == Status.FULL_REBUILD_STARTED;
+ }
+
@Override
public void validate(ReadCommand command) throws InvalidRequestException
{
diff --git a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
new file mode 100644
index 0000000000..d08fec974a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.JsonUtils;
+
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertArrayEquals;
+
+public class IndexStatusManagerTest
+{
+ static class Testcase
+ {
+ String keyspace;
+ int numRequired;
+ AbstractReplicationStrategy replicationStrategy;
+ Map<InetAddressAndPort, Map<String, Index.Status>> indexStatus;
+ EndpointsForRange expected;
+
+ Testcase(Builder builder)
+ {
+ keyspace = builder.keyspace;
+ numRequired = builder.numRequired;
+ replicationStrategy = builder.replicationStrategy;
+ indexStatus = builder.indexStatus;
+ expected = builder.expected;
+ }
+
+ static class Builder
+ {
+ String keyspace;
+ int numRequired;
+ AbstractReplicationStrategy replicationStrategy;
+ Map<InetAddressAndPort, Map<String, Index.Status>> indexStatus;
+ EndpointsForRange expected;
+
+ Builder keyspace(String ks)
+ {
+ keyspace = ks;
+ return this;
+ }
+
+ Builder numRequired(int required)
+ {
+ numRequired = required;
+ return this;
+ }
+
+ Builder replicationStrategy(AbstractReplicationStrategy strategy)
+ {
+ replicationStrategy = strategy;
+ return this;
+ }
+
+ Builder indexStatus(Map<InetAddressAndPort, Map<String,
Index.Status>> status)
+ {
+ indexStatus = status;
+ return this;
+ }
+
+ Builder expected(EndpointsForRange endpoints)
+ {
+ expected = endpoints;
+ return this;
+ }
+
+ Testcase build()
+ {
+ return new Testcase(this);
+ }
+ }
+ }
+
+ IEndpointSnitch snitch = new AbstractNetworkTopologySnitch()
+ {
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "rack";
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ return "DC";
+ }
+ };
+
+ @Test
+ public void shouldPrioritizeSuccessfulEndpoints() throws
UnknownHostException
+ {
+ runTest(new Testcase.Builder()
+ .keyspace("ks1")
+ .replicationStrategy(
+ new NetworkTopologyStrategy("ks1", new
TokenMetadata(), snitch, Map.of("DC", "5"))
+ )
+ .indexStatus(Map.of(
+ InetAddressAndPort.getByName("127.0.0.251"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.252"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.253"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.254"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.255"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.UNKNOWN
+ )
+ ))
+ .numRequired(2)
+ .expected(EndpointsForRange.of(
+ // successful
+ full(InetAddressAndPort.getByName("127.0.0.251")),
+ full(InetAddressAndPort.getByName("127.0.0.252")),
+ full(InetAddressAndPort.getByName("127.0.0.254")),
+
+ // queryable, but unknown
+ full(InetAddressAndPort.getByName("127.0.0.253")),
+ full(InetAddressAndPort.getByName("127.0.0.255"))
+ ))
+ .build()
+ );
+ }
+
+ @Test
+ public void shouldNotPrioritizeWhenNoSuccessfulEndpoints() throws
UnknownHostException
+ {
+ runTest(new Testcase.Builder()
+ .keyspace("ks1")
+ .replicationStrategy(
+ new NetworkTopologyStrategy("ks1", new
TokenMetadata(), snitch, Map.of("DC", "5"))
+ )
+ .indexStatus(Map.of(
+ InetAddressAndPort.getByName("127.0.0.251"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.UNKNOWN
+ ),
+ InetAddressAndPort.getByName("127.0.0.252"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.UNKNOWN,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.253"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.254"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.UNKNOWN
+ ),
+ InetAddressAndPort.getByName("127.0.0.255"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2", Index.Status.UNKNOWN,
+ "ks1.idx3", Index.Status.UNKNOWN
+ )
+ ))
+ .numRequired(2)
+ .expected(EndpointsForRange.of(
+ // unmodified order
+ full(InetAddressAndPort.getByName("127.0.0.251")),
+ full(InetAddressAndPort.getByName("127.0.0.252")),
+ full(InetAddressAndPort.getByName("127.0.0.253")),
+ full(InetAddressAndPort.getByName("127.0.0.254")),
+ full(InetAddressAndPort.getByName("127.0.0.255"))
+ ))
+ .build()
+ );
+ }
+
+ @Test
+ public void shouldFilterOutNonQueryableEndpoints() throws
UnknownHostException
+ {
+ runTest(new Testcase.Builder()
+ .keyspace("ks1")
+ .replicationStrategy(
+ new NetworkTopologyStrategy("ks1", new
TokenMetadata(), snitch, Map.of("DC", "5"))
+ )
+ .indexStatus(Map.of(
+ InetAddressAndPort.getByName("127.0.0.251"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.FULL_REBUILD_STARTED
+ ),
+ InetAddressAndPort.getByName("127.0.0.252"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.UNKNOWN,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.253"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.254"),
+ Map.of(
+ "ks1.idx1", Index.Status.BUILD_FAILED,
+ "ks1.idx2", Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.UNKNOWN
+ ),
+ InetAddressAndPort.getByName("127.0.0.255"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2", Index.Status.UNKNOWN,
+ "ks1.idx3", Index.Status.UNKNOWN
+ )
+ ))
+ .numRequired(1)
+ .expected(EndpointsForRange.of(
+ // successful
+ full(InetAddressAndPort.getByName("127.0.0.253")),
+
+ // queryable, but unknown
+ full(InetAddressAndPort.getByName("127.0.0.252")),
+ full(InetAddressAndPort.getByName("127.0.0.255"))
+ ))
+ .build()
+ );
+ }
+
+ @Test
+ public void shouldThrowWhenNotEnoughQueryableEndpoints()
+ {
+ assertThatThrownBy(() ->
+ runTest(new Testcase.Builder()
+ .keyspace("ks1")
+ .replicationStrategy(
+ new NetworkTopologyStrategy("ks1", new
TokenMetadata(), snitch, Map.of("DC", "5"))
+ )
+ .indexStatus(Map.of(
+ InetAddressAndPort.getByName("127.0.0.251"),
+ Map.of(
+ "ks1.idx1",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.UNKNOWN,
+ "ks1.idx3",
Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.252"),
+ Map.of(
+ "ks1.idx1",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2", Index.Status.BUILD_FAILED,
+ "ks1.idx3",
Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.253"),
+ Map.of(
+ "ks1.idx1",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3",
Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.254"),
+ Map.of(
+ "ks1.idx1",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_FAILED
+ ),
+ InetAddressAndPort.getByName("127.0.0.255"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3",
Index.Status.BUILD_SUCCEEDED
+ )
+ ))
+ .numRequired(4)
+ .build()))
+ .isInstanceOf(ReadFailureException.class)
+ .hasMessageStartingWith("Operation failed")
+ .hasMessageContaining("INDEX_NOT_AVAILABLE from
/127.0.0.252:7000")
+ .hasMessageContaining("INDEX_NOT_AVAILABLE from
/127.0.0.254:7000");
+ }
+
+ @Test
+ public void shouldThrowWhenNoQueryableEndpoints()
+ {
+ assertThatThrownBy(() ->
+ runTest(new Testcase.Builder()
+ .keyspace("ks1")
+ .replicationStrategy(
+ new NetworkTopologyStrategy("ks1", new
TokenMetadata(), snitch, Map.of("DC", "3"))
+ )
+ .indexStatus(Map.of(
+ InetAddressAndPort.getByName("127.0.0.253"),
+ Map.of(
+ "ks1.idx1", Index.Status.DROPPED,
+ "ks1.idx2",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3",
Index.Status.BUILD_SUCCEEDED
+ ),
+ InetAddressAndPort.getByName("127.0.0.254"),
+ Map.of(
+ "ks1.idx1",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx2",
Index.Status.BUILD_SUCCEEDED,
+ "ks1.idx3", Index.Status.BUILD_FAILED
+ ),
+ InetAddressAndPort.getByName("127.0.0.255"),
+ Map.of(
+ "ks1.idx1", Index.Status.UNKNOWN,
+ "ks1.idx2",
Index.Status.FULL_REBUILD_STARTED,
+ "ks1.idx3",
Index.Status.BUILD_SUCCEEDED
+ )
+ ))
+ .numRequired(1)
+ .build()))
+ .isInstanceOf(ReadFailureException.class)
+ .hasMessageStartingWith("Operation failed")
+ .hasMessageContaining("INDEX_NOT_AVAILABLE from
/127.0.0.253:7000")
+ .hasMessageContaining("INDEX_NOT_AVAILABLE from
/127.0.0.254:7000")
+ .hasMessageContaining("INDEX_NOT_AVAILABLE from
/127.0.0.255:7000");
+ }
+
+ void runTest(Testcase testcase)
+ {
+ // get all endpoints from indexStatus
+ Set<Replica> replicas = testcase.indexStatus.keySet()
+ .stream()
+ .map(ReplicaUtils::full)
+ .collect(Collectors.toSet());
+
+ // get all indexes from one of the entries of indexStatus
+ Set<Index> indexes = testcase.indexStatus.entrySet().iterator().next()
+ .getValue()
+ .keySet()
+ .stream()
+ .map(identifier -> mockedIndex(identifier.split("\\.")[1]))
+ .collect(Collectors.toSet());
+
+ // send indexStatus for each endpoint
+ testcase.indexStatus.forEach((endpoint, indexStatus) ->
+ IndexStatusManager.instance.receivePeerIndexStatus(
+ endpoint,
+
VersionedValue.unsafeMakeVersionedValue(JsonUtils.writeAsJsonString(indexStatus),
1)
+ ));
+
+ // sort the replicas here, so that we can assert the order later
+ EndpointsForRange endpoints = EndpointsForRange.copyOf(new
TreeSet<>(replicas));
+ Keyspace ks = mockedKeyspace(testcase.keyspace,
testcase.replicationStrategy);
+ Index.QueryPlan qp = mockedQueryPlan(indexes);
+ ConsistencyLevel cl = mockedConsistencyLevel(testcase.numRequired);
+
+ EndpointsForRange actual =
IndexStatusManager.instance.filterForQuery(endpoints, ks, qp, cl);
+
+ assertArrayEquals(
+ testcase.expected.stream().toArray(),
+ actual.stream().toArray()
+ );
+ }
+
+ Keyspace mockedKeyspace(String name, AbstractReplicationStrategy
replicationStrategy)
+ {
+ Keyspace mock = Mockito.mock(Keyspace.class);
+ Mockito.when(mock.getName()).thenReturn(name);
+
Mockito.when(mock.getReplicationStrategy()).thenReturn(replicationStrategy);
+ return mock;
+ }
+
+ Index mockedIndex(String name)
+ {
+ Index mock = Mockito.mock(Index.class);
+
+ Mockito.when(mock.getIndexMetadata())
+ .thenReturn(IndexMetadata.fromSchemaMetadata(name,
IndexMetadata.Kind.KEYS, null));
+
+ Mockito.when(mock.isQueryable(Mockito.any()))
+ .thenAnswer(invocationOnMock ->
+ {
+ Index.Status status = invocationOnMock.getArgument(0);
+ return (status == Index.Status.BUILD_SUCCEEDED || status
== Index.Status.UNKNOWN);
+ });
+
+ return mock;
+ }
+
+ Index.QueryPlan mockedQueryPlan(Set<Index> indexes)
+ {
+ Index.QueryPlan mock = Mockito.mock(Index.QueryPlan.class);
+ Mockito.when(mock.getIndexes()).thenReturn(indexes);
+ return mock;
+ }
+
+ ConsistencyLevel mockedConsistencyLevel(int required)
+ {
+ ConsistencyLevel mock = Mockito.mock(ConsistencyLevel.class);
+ Mockito.when(mock.blockFor(Mockito.any())).thenReturn(required);
+ return mock;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]