This is an automated email from the ASF dual-hosted git repository.
jberragan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98a767cc CASSANALYTICS-101: Add EACH_QUORUM consistency level support
in bulk reader (#153)
98a767cc is described below
commit 98a767ccf91ee698a7f94bf8490eb595b1572db3
Author: Sudipta <[email protected]>
AuthorDate: Fri Dec 19 06:46:26 2025 -0800
CASSANALYTICS-101: Add EACH_QUORUM consistency level support in bulk reader
(#153)
Add EACH_QUORUM consistency level support in bulk reader
---
.../spark/data/partitioner/ConsistencyLevel.java | 32 ++
.../data/partitioner/ConsistencyLevelTests.java | 97 +++++
.../cassandra/spark/data/PartitionedDataLayer.java | 72 +++-
.../spark/data/partitioner/MultiDCReplicas.java | 57 +++
.../spark/data/PartitionedDataLayerTests.java | 10 +
.../data/partitioner/MultiDCReplicasTest.java | 230 ++++++++++++
.../data/partitioner/MultipleReplicasTests.java | 9 +
.../distributed/impl/CassandraCluster.java | 15 +-
.../testing/ClusterBuilderConfiguration.java | 16 +
.../BulkReaderMultiDCConsistencyTest.java | 391 +++++++++++++++++++++
.../apache/cassandra/analytics/SparkTestUtils.java | 1 +
11 files changed, 914 insertions(+), 16 deletions(-)
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
index 529d9fdc..6c464442 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.spark.data.partitioner;
+import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -63,6 +66,35 @@ public enum ConsistencyLevel
: quorumFor(replicationFactor);
}
+ /**
+ * Calculates the number of replicas that must acknowledge the operation
in each data center
+ * for EACH_QUORUM consistency level.
+ *
+ * @param replicationFactor the replication factor configuration
containing data center information
+ * @return a map where keys are data center names and values are the
number of replicas
+ * required to achieve local quorum in each data center
+ * @throws IllegalArgumentException if the consistency level is not
EACH_QUORUM or if the
+ * replication strategy is not
NetworkTopologyStrategy
+ */
+ public Map<String, Integer> eachQuorumBlockFor(@NotNull ReplicationFactor
replicationFactor)
+ {
+ if (this != EACH_QUORUM)
+ {
+ throw new IllegalArgumentException(String.format("Consistency
level needed is EACH_QUORUM, provided is: %s",
+ this.name()));
+ }
+ if (replicationFactor.getReplicationStrategy() !=
ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy)
+ {
+ throw new IllegalArgumentException(String.format("Invalid
Replication Strategy: %s for EACH_QUORUM consistency read.",
+
replicationFactor.getReplicationStrategy().name()));
+ }
+ return replicationFactor.getOptions().keySet().stream()
+ .collect(Collectors.toMap(
+ dataCenter -> dataCenter,
+ dataCenter ->
localQuorumFor(replicationFactor, dataCenter)
+ ));
+ }
+
private int getNetworkTopologyRf(@NotNull ReplicationFactor
replicationFactor, @Nullable String dataCenter)
{
int rf;
diff --git
a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
index 9058bd2c..523b2da6 100644
---
a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
+++
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
@@ -19,12 +19,15 @@
package org.apache.cassandra.spark.data.partitioner;
+import java.util.Map;
+
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.apache.cassandra.spark.data.ReplicationFactor;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class ConsistencyLevelTests
{
@@ -109,4 +112,98 @@ public class ConsistencyLevelTests
new
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
ImmutableMap.of("DC1", 5, "DC2", 5)),
null)).isEqualTo(10);
}
+
+ @Test
+ void testEachQuorumBlockForWithNetworkTopologyStrategy()
+ {
+ // Create a NetworkTopologyStrategy with multiple data centers
+ Map<String, Integer> options = Map.of(
+ "DC1", 3,
+ "DC2", 5,
+ "DC3", 2
+ );
+ ReplicationFactor replicationFactor = new ReplicationFactor(
+ ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+ options
+ );
+
+ Map<String, Integer> result =
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor);
+
+ // Expected: (3/2)+1=2, (5/2)+1=3, (2/2)+1=2
+ assertThat(result).hasSize(3)
+ .containsEntry("DC1", 2)
+ .containsEntry("DC2", 3)
+ .containsEntry("DC3", 2);
+ }
+
+ @Test
+ void testEachQuorumBlockForWithSingleDataCenter()
+ {
+ Map<String, Integer> options = Map.of("DC1", 6);
+ ReplicationFactor replicationFactor = new ReplicationFactor(
+ ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+ options
+ );
+
+ Map<String, Integer> result =
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor);
+
+ // Expected: (6/2)+1=4
+ assertThat(result).containsEntry("DC1", 4);
+ assertThat(result).hasSize(1);
+ }
+
+ @Test
+ void testEachQuorumBlockForWithOddReplicationFactors()
+ {
+ Map<String, Integer> options = Map.of(
+ "DC1", 1,
+ "DC2", 3,
+ "DC3", 5
+ );
+ ReplicationFactor replicationFactor = new ReplicationFactor(
+ ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+ options
+ );
+
+ Map<String, Integer> result =
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor);
+
+ // Expected: (1/2)+1=1, (3/2)+1=2, (5/2)+1=3
+ assertThat(result).containsEntry("DC1", 1);
+ assertThat(result).containsEntry("DC2", 2);
+ assertThat(result).containsEntry("DC3", 3);
+ assertThat(result).hasSize(3);
+ }
+
+ @Test
+ void testEachQuorumBlockForThrowsExceptionForSimpleStrategy()
+ {
+ ReplicationFactor replicationFactor =
ReplicationFactor.simpleStrategy(3);
+
+ assertThatThrownBy(() ->
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid Replication Strategy: SimpleStrategy
for EACH_QUORUM consistency read.");
+ }
+
+ @Test
+ void testEachQuorumBlockForAllOtherConsistencyLevelsThrowException()
+ {
+ Map<String, Integer> options = Map.of("DC1", 3);
+ ReplicationFactor replicationFactor = new ReplicationFactor(
+ ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+ options
+ );
+
+ // Test a few other consistency levels to ensure they all throw
exceptions
+ assertThatThrownBy(() ->
ConsistencyLevel.ONE.eachQuorumBlockFor(replicationFactor))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Consistency level needed is EACH_QUORUM,
provided is:ONE");
+
+ assertThatThrownBy(() ->
ConsistencyLevel.LOCAL_QUORUM.eachQuorumBlockFor(replicationFactor))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Consistency level needed is EACH_QUORUM,
provided is:LOCAL_QUORUM");
+
+ assertThatThrownBy(() ->
ConsistencyLevel.ALL.eachQuorumBlockFor(replicationFactor))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Consistency level needed is EACH_QUORUM,
provided is:ALL");
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
index d3200166..72ee3d7c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.data;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,11 +43,12 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.bridge.TokenRange;
-import org.apache.cassandra.spark.utils.RangeUtils;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.data.partitioner.CassandraRing;
import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
+import org.apache.cassandra.spark.data.partitioner.MultiDCReplicas;
import org.apache.cassandra.spark.data.partitioner.MultipleReplicas;
import org.apache.cassandra.spark.data.partitioner.NotEnoughReplicasException;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
@@ -55,7 +57,7 @@ import
org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
import org.apache.cassandra.spark.sparksql.NoMatchFoundException;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.utils.RangeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -134,10 +136,6 @@ public abstract class PartitionedDataLayer extends
DataLayer
{
throw new IllegalArgumentException("SERIAL or LOCAL_SERIAL are
invalid consistency levels for the Bulk Reader");
}
- if (consistencyLevel == ConsistencyLevel.EACH_QUORUM)
- {
- throw new UnsupportedOperationException("EACH_QUORUM has not been
implemented yet");
- }
}
protected void validateReplicationFactor(@NotNull ReplicationFactor
replicationFactor)
@@ -165,6 +163,10 @@ public abstract class PartitionedDataLayer extends
DataLayer
{
return;
}
+
+ Preconditions.checkArgument(ConsistencyLevel.EACH_QUORUM !=
consistencyLevel,
+ "A DC should not be specified for
EACH_QUORUM consistency level. Provided DC: " + dc);
+
Preconditions.checkArgument(replicationFactor.getOptions().containsKey(dc),
"DC %s not found in replication factor %s",
dc,
replicationFactor.getOptions().keySet());
@@ -279,12 +281,53 @@ public abstract class PartitionedDataLayer extends
DataLayer
LOGGER.info("Creating partitioned SSTablesSupplier for Spark partition
partitionId={} rangeLower={} rangeUpper={} numReplicas={}",
partitionId, range.lowerEndpoint(), range.upperEndpoint(),
replicas.size());
+ return constructSSTablesSupplier(partitionId, replicationFactor,
instRanges, replicas, range);
+ }
+
+ @NotNull
+ private SSTablesSupplier constructSSTablesSupplier(
+ int partitionId,
+ ReplicationFactor replicationFactor,
+ Map<Range<BigInteger>, List<CassandraInstance>> instRanges,
+ Set<CassandraInstance> replicas,
+ Range<BigInteger> range)
+ {
+ if (consistencyLevel == ConsistencyLevel.EACH_QUORUM)
+ {
+ Map<String, Integer> minReplicasPerDC =
consistencyLevel.eachQuorumBlockFor(replicationFactor);
+ LOGGER.debug("Reading with EACH_QUORUM consistency level for
partitionId={}, minReplicasPerDC={}",
+ partitionId, minReplicasPerDC);
+ Map<String, Set<CassandraInstance>> replicasByDC =
replicas.stream()
+
.collect(Collectors.groupingBy(
+
CassandraInstance::dataCenter,
+
Collectors.toSet()
+ ));
+
+ Map<String, SSTablesSupplier> perDCSSTablesSupplier = new
HashMap<>(minReplicasPerDC.size());
+ for (Map.Entry<String, Integer> entry :
minReplicasPerDC.entrySet())
+ {
+ Set<CassandraInstance> replicasInDC =
replicasByDC.getOrDefault(entry.getKey(), Collections.emptySet());
+ MultipleReplicas multipleReplicas =
constructSSTablesSupplierSingleDC(partitionId, instRanges, replicasInDC, range,
entry.getValue());
+ perDCSSTablesSupplier.put(entry.getKey(), multipleReplicas);
+ }
+ return new MultiDCReplicas(perDCSSTablesSupplier);
+ }
+ int minReplicas = consistencyLevel.blockFor(replicationFactor,
datacenter);
+ return constructSSTablesSupplierSingleDC(partitionId, instRanges,
replicas, range, minReplicas);
+ }
+
+ @NotNull
+ private MultipleReplicas constructSSTablesSupplierSingleDC(
+ int partitionId,
+ Map<Range<BigInteger>, List<CassandraInstance>> instRanges,
+ Set<CassandraInstance> replicas,
+ Range<BigInteger> range, int minReplicas)
+ {
// Use consistency level and replication factor to calculate min
number of replicas required
// to satisfy consistency level; split replicas into 'primary' and
'backup' replicas,
// attempt on primary replicas and use backups to retry in the event
of a failure
- int minReplicas = consistencyLevel.blockFor(replicationFactor,
datacenter);
ReplicaSet replicaSet = PartitionedDataLayer.splitReplicas(
- consistencyLevel, datacenter, instRanges, replicas,
this::getAvailability, minReplicas, partitionId);
+ consistencyLevel, datacenter, instRanges, replicas,
this::getAvailability, minReplicas, partitionId);
if (replicaSet.primary().size() < minReplicas)
{
// Could not find enough primary replicas to meet consistency level
@@ -295,11 +338,16 @@ public abstract class PartitionedDataLayer extends
DataLayer
ExecutorService executor = executorService();
Stats stats = stats();
Set<SingleReplica> primaryReplicas = replicaSet.primary().stream()
- .map(instance -> new SingleReplica(instance, this, range,
partitionId, executor, stats, replicaSet.isRepairPrimary(instance)))
- .collect(Collectors.toSet());
+ .map(instance ->
+ new
SingleReplica(instance, this, range,
+
partitionId, executor, stats,
+
replicaSet.isRepairPrimary(instance)))
+
.collect(Collectors.toSet());
Set<SingleReplica> backupReplicas = replicaSet.backup().stream()
- .map(instance -> new SingleReplica(instance, this, range,
partitionId, executor, stats, true))
- .collect(Collectors.toSet());
+ .map(instance ->
+ new
SingleReplica(instance, this, range,
+
partitionId, executor, stats, true))
+
.collect(Collectors.toSet());
return new MultipleReplicas(primaryReplicas, backupReplicas, stats);
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicas.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicas.java
new file mode 100644
index 00000000..dae06fa9
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicas.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.partitioner;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.spark.data.SSTablesSupplier;
+import org.apache.cassandra.spark.reader.SparkSSTableReader;
+
+/**
+ * This class is SSTable supplier replicas in multiple data centers.
+ * This type of supplier is useful for EACH_QUORUM consistency level.
+ */
+public class MultiDCReplicas extends SSTablesSupplier
+{
+ private final Map<String, SSTablesSupplier> replicasPerDC;
+
+ public MultiDCReplicas(Map<String, SSTablesSupplier> replicasPerDC)
+ {
+ if (replicasPerDC == null || replicasPerDC.isEmpty())
+ {
+ throw new IllegalArgumentException("replicasPerDC cannot be null
or empty");
+ }
+ this.replicasPerDC = replicasPerDC;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public <T extends SparkSSTableReader> Set<T> openAll(ReaderOpener<T>
readerOpener)
+ {
+ Set<T> combinedReaders = new HashSet<>();
+ replicasPerDC.values()
+ .forEach(supplier ->
combinedReaders.addAll(supplier.openAll(readerOpener)));
+ return combinedReaders;
+ }
+}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
index 2768b146..7ccb879b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
@@ -234,6 +234,16 @@ public class PartitionedDataLayerTests extends
VersionRunner
.isInstanceOf(IllegalArgumentException.class);
}
+ @Test
+ public void testReplicationFactorEachQuorum()
+ {
+ assertThatThrownBy(() -> PartitionedDataLayer
+ .validateReplicationFactor(EACH_QUORUM,
+
TestUtils.networkTopologyStrategy(ImmutableMap.of("PV", 3, "MR", 3)),
+ "MR"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
@ParameterizedTest
@MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
public void testSSTableSupplier(CassandraBridge bridge)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicasTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicasTest.java
new file mode 100644
index 00000000..5575ff7a
--- /dev/null
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicasTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.partitioner;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.spark.data.SSTablesSupplier;
+import org.apache.cassandra.spark.reader.SparkSSTableReader;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link MultiDCReplicas} class
+ */
+public class MultiDCReplicasTest
+{
+ @Mock
+ private SSTablesSupplier dc1Supplier;
+
+ @Mock
+ private SSTablesSupplier dc2Supplier;
+
+ @Mock
+ private SSTablesSupplier.ReaderOpener<TestSparkSSTableReader> readerOpener;
+
+ private MultiDCReplicas multiDCReplicas;
+
+ @BeforeEach
+ void setUp()
+ {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ void testConstructorWithValidMap()
+ {
+ Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+ replicasMap.put("dc1", dc1Supplier);
+ replicasMap.put("dc2", dc2Supplier);
+
+ multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+ assertThat(multiDCReplicas).isNotNull();
+ }
+
+ @Test
+ void testConstructorWithNullMap()
+ {
+ assertThatThrownBy(() -> new MultiDCReplicas(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("replicasPerDC cannot be null or empty");
+ }
+
+ @Test
+ void testConstructorWithEmptyMap()
+ {
+ assertThatThrownBy(() -> new MultiDCReplicas(Map.of()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("replicasPerDC cannot be null or empty");
+ }
+
+ @Test
+ void testOpenAllWithMultipleDCs()
+ {
+ Map<String, SSTablesSupplier> replicasMap = Map.of("dc1", dc1Supplier,
+ "dc2", dc2Supplier);
+
+ TestSparkSSTableReader reader1 = new
TestSparkSSTableReader(BigInteger.ONE, BigInteger.TEN);
+ TestSparkSSTableReader reader2 = new
TestSparkSSTableReader(BigInteger.valueOf(11), BigInteger.valueOf(20));
+ TestSparkSSTableReader reader3 = new
TestSparkSSTableReader(BigInteger.valueOf(21), BigInteger.valueOf(30));
+
+ Set<TestSparkSSTableReader> dc1Readers = Set.of(reader1, reader2);
+ Set<TestSparkSSTableReader> dc2Readers = Set.of(reader3);
+
+ when(dc1Supplier.openAll(readerOpener)).thenReturn(dc1Readers);
+ when(dc2Supplier.openAll(readerOpener)).thenReturn(dc2Readers);
+
+ multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+ Set<TestSparkSSTableReader> result =
multiDCReplicas.openAll(readerOpener);
+
+ assertThat(result).hasSize(3);
+ assertThat(result).containsExactlyInAnyOrder(reader1, reader2,
reader3);
+ }
+
+ @Test
+ void testOpenAllWithSingleDC()
+ {
+ Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+ replicasMap.put("dc1", dc1Supplier);
+
+ TestSparkSSTableReader reader1 = new
TestSparkSSTableReader(BigInteger.ONE, BigInteger.TEN);
+ TestSparkSSTableReader reader2 = new
TestSparkSSTableReader(BigInteger.valueOf(11), BigInteger.valueOf(20));
+
+ Set<TestSparkSSTableReader> dc1Readers = Set.of(reader1, reader2);
+
+ when(dc1Supplier.openAll(readerOpener)).thenReturn(dc1Readers);
+
+ multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+ Set<TestSparkSSTableReader> result =
multiDCReplicas.openAll(readerOpener);
+
+ assertThat(result).hasSize(2);
+ assertThat(result).containsExactlyInAnyOrder(reader1, reader2);
+ }
+
+ @Test
+ void testOpenAllWithEmptySupplierResults()
+ {
+ // Arrange
+ Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+ replicasMap.put("dc1", dc1Supplier);
+ replicasMap.put("dc2", dc2Supplier);
+
+ when(dc1Supplier.openAll(readerOpener)).thenReturn(new HashSet<>());
+ when(dc2Supplier.openAll(readerOpener)).thenReturn(new HashSet<>());
+
+ multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+ Set<TestSparkSSTableReader> result =
multiDCReplicas.openAll(readerOpener);
+
+ assertThat(result).isNotNull();
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ void testOpenAllWithDuplicateReaders()
+ {
+ Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+ replicasMap.put("dc1", dc1Supplier);
+ replicasMap.put("dc2", dc2Supplier);
+
+ TestSparkSSTableReader reader1 = new
TestSparkSSTableReader(BigInteger.ONE, BigInteger.TEN);
+ TestSparkSSTableReader reader2 = new
TestSparkSSTableReader(BigInteger.valueOf(11), BigInteger.valueOf(20));
+
+ Set<TestSparkSSTableReader> dc1Readers = Set.of(reader1, reader2);
+ Set<TestSparkSSTableReader> dc2Readers = Set.of(reader1, reader2);
+
+ when(dc1Supplier.openAll(readerOpener)).thenReturn(dc1Readers);
+ when(dc2Supplier.openAll(readerOpener)).thenReturn(dc2Readers);
+
+ multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+ Set<TestSparkSSTableReader> result =
multiDCReplicas.openAll(readerOpener);
+
+ assertThat(result).hasSize(2);
+ assertThat(result).containsExactlyInAnyOrder(reader1, reader2);
+ }
+
+ /**
+ * Test implementation of SparkSSTableReader for testing purposes
+ */
+ private static class TestSparkSSTableReader implements SparkSSTableReader
+ {
+ private final BigInteger firstToken;
+ private final BigInteger lastToken;
+
+ TestSparkSSTableReader(BigInteger firstToken, BigInteger lastToken)
+ {
+ this.firstToken = firstToken;
+ this.lastToken = lastToken;
+ }
+
+ @Override
+ public BigInteger firstToken()
+ {
+ return firstToken;
+ }
+
+ @Override
+ public BigInteger lastToken()
+ {
+ return lastToken;
+ }
+
+ public boolean ignore()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (!(obj instanceof TestSparkSSTableReader))
+ {
+ return false;
+ }
+ TestSparkSSTableReader that = (TestSparkSSTableReader) obj;
+ return firstToken.equals(that.firstToken) &&
lastToken.equals(that.lastToken);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return firstToken.hashCode() + lastToken.hashCode();
+ }
+ }
+}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
index 3d5dfb0a..91a0c22e 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
@@ -189,8 +189,11 @@ public class MultipleReplicasTests
public static class TestSSTableReader implements SparkSSTableReader
{
+ private final SSTable ssTable;
+
public TestSSTableReader(SSTable ssTable)
{
+ this.ssTable = ssTable;
}
public BigInteger firstToken()
@@ -207,5 +210,11 @@ public class MultipleReplicasTests
{
return false;
}
+
+ @Override
+ public String toString()
+ {
+ return ssTable.getDataFileName();
+ }
}
}
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index ab8524dc..42bac995 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -132,10 +132,17 @@ public class CassandraCluster<I extends IInstance>
implements IClusterExtension<
if (dcCount > 1)
{
- clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
- (nodeId) ->
nodeId % 2 != 0 ?
-
dcAndRack("datacenter1", "rack1") :
-
dcAndRack("datacenter2", "rack2")));
+ if (configuration.dcAndRackSupplier != null)
+ {
+
clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
configuration.dcAndRackSupplier));
+ }
+ else
+ {
+
clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
+ (nodeId) ->
nodeId % 2 != 0 ?
+
dcAndRack("datacenter1", "rack1") :
+
dcAndRack("datacenter2", "rack2")));
+ }
}
if (configuration.instanceInitializer != null)
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
index a08fe932..4a0d6448 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
@@ -22,10 +22,12 @@ package org.apache.cassandra.testing;
import java.util.EnumSet;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.IntFunction;
import com.google.common.base.Preconditions;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
/**
* Defines the configuration to build the {@link IClusterExtension} cluster
@@ -42,6 +44,7 @@ public class ClusterBuilderConfiguration
public String partitioner;
public Map<String, Object> additionalInstanceConfig = null;
public int tokenCount = 1;
+ public IntFunction<NetworkTopology.DcAndRack> dcAndRackSupplier;
/**
* Adds a features to the list of default features.
@@ -176,4 +179,17 @@ public class ClusterBuilderConfiguration
this.tokenCount = tokenCount;
return this;
}
+
+ /**
+ * Sets a supplier function that provides datacenter and rack information
for each node in the cluster.
+ *
+ * @param dcAndRackSupplier a function that takes a node index and returns
the corresponding
+ * datacenter and rack configuration for that node
+ * @return this configuration instance for method chaining
+ */
+ public ClusterBuilderConfiguration
dcAndRackSupplier(IntFunction<NetworkTopology.DcAndRack> dcAndRackSupplier)
+ {
+ this.dcAndRackSupplier = dcAndRackSupplier;
+ return this;
+ }
}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
new file mode 100644
index 00000000..b4bf2065
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.agent.ByteBuddyAgent;
+import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.data.CassandraDataLayer;
+import org.apache.cassandra.spark.data.PartitionedDataLayer;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.data.partitioner.NotEnoughReplicasException;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BulkReaderMultiDCConsistencyTest extends
SharedClusterSparkIntegrationTestBase
+{
+ static final List<String> OG_DATASET = Arrays.asList("a", "b", "c", "d",
"e", "f", "g");
+ static final int TEST_KEY = 1;
+ static final String TEST_VAL = "C*";
+ QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .dcCount(2)
+ .nodesPerDc(3)
+ .dcAndRackSupplier((nodeId) -> {
+ switch (nodeId)
+ {
+ case 1:
+ case 2:
+ case 3:
+ return dcAndRack("datacenter1", "rack1");
+ case 4:
+ case 5:
+ case 6:
+ return dcAndRack("datacenter2", "rack1");
+ default:
+ return dcAndRack("", "");
+ }
+ });
+ }
+
+ /**
+ * Happy path test. All nodes have the updated values.
+ * QUORUM == EACH_QUORUM == ALL == driver read
+ */
+ @Test
+ void happyPathTest()
+ {
+ List<String> testDataSet = new ArrayList<>(OG_DATASET);
+ testDataSet.set(TEST_KEY, TEST_VAL);
+
+ // Set value=TEST_VAL for key=TEST_KEY for all nodes
+ setValueForALL(TEST_KEY, TEST_VAL);
+
+ // Bulk read with ALL consistency
+ List<Row> rowList = bulkRead(ConsistencyLevel.ALL.name());
+ validateBulkReadRows(rowList, testDataSet);
+
+ // Bulk read with QUORUM consistency
+ rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ validateBulkReadRows(rowList, testDataSet);
+
+ // Bulk read with EACH_QUORUM consistency
+ rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ validateBulkReadRows(rowList, testDataSet);
+
+ // Read the value for the test key using driver for different
consistency levels
+ String valAll = readValueForKey(TEST_KEY, ConsistencyLevel.ALL);
+ String valQuorum = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
+ String valEachQuorum = readValueForKey(TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
+
assertThat(valAll).isEqualTo(valQuorum).isEqualTo(valEachQuorum).isEqualTo(rowList.get(1).getString(1));
+
+ // Revert the value update for all nodes
+ setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ }
+
+ public static PartitionedDataLayer.AvailabilityHint
getAvailability(CassandraInstance instance)
+ {
+ if (instance.nodeName().equals("localhost5") ||
instance.nodeName().equals("localhost6"))
+ {
+ return PartitionedDataLayer.AvailabilityHint.MOVING;
+ }
+ return PartitionedDataLayer.AvailabilityHint.UP;
+ }
+
+ /**
+ * This test creates a scenario where bulk reader reads the most recently
updated value with EACH_QUORUM
+ * but reads stale value with QUORUM. This shows that QUORUM is different
from EACH_QUORUM in multi-dc settings.
+ * Here node5(DC2) and node6(DC2) has the update value for TEST_KEY.
+ *
+ * @throws NoSuchMethodException
+ */
+ @Test
+ void eachQuorumIsNotQuorum() throws NoSuchMethodException
+ {
+ List<String> updatedDataSet = new ArrayList<>(OG_DATASET);
+ updatedDataSet.set(1, TEST_VAL);
+
+ // Internally update value for TEST_KEY for node5 and node6. This
update doesn't propagate to other nodes.
+ updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
+ updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
+
+ // Bytecode injection to simulate a scenario where node5 and node6 are
at the end of the replica list for bulk reader.
+ // This simulation mimics a real world scenario.
+ // With this arrangement PartitionedDataLayer.splitReplicas method for
QUORUM will split the replicas like below:
+ // primaryReplicas: [Node1, Node2, Node3, Node4]
+ // secondaryReplicas: [Node5, Node6]
+ // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk
reader will read from [Node1, Node2, Node3, Node4] only.
+ ByteBuddyAgent.install();
+ new ByteBuddy()
+ .redefine(CassandraDataLayer.class)
+ .method(ElementMatchers.named("getAvailability"))
+ .intercept(
+
MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability",
CassandraInstance.class))
+ .withAllArguments()
+ )
+ .make()
+ .load(
+ CassandraDataLayer.class.getClassLoader(),
+ ClassReloadingStrategy.fromInstalledAgent()
+ );
+
+ // Bulk read with QUORUM consistency
+ List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ // Validate that the result doesn't have the updated data.
+ validateBulkReadRows(rowList, OG_DATASET);
+
+ // Message filter to mimic message drops from Node5 and Node6 to Node1.
+ // We are setting this up to simulate a scenario where reading values
with QUORUM consistency with driver
+ // and using Node1 as the coordinator doesn't get the values from
Node5 and Node6.
+ cluster.filters().allVerbs().from(5).to(1).drop();
+ cluster.filters().allVerbs().from(6).to(1).drop();
+
+ // Read value for TEST_KEY with driver using Node1 as coordinator
+ String quorumVal = readValueForKey(cluster.get(1).coordinator(),
TEST_KEY, ConsistencyLevel.QUORUM);
+ // Validate that the updated value is not read
+ assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
+
+ // Cleanup message filter
+ cluster.filters().reset();
+
+ // Bulk read with EACH_QUORUM consistency
+ rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ // Validate that bulk reader was able to read the updated value
+ validateBulkReadRows(rowList, updatedDataSet);
+ // Read value using driver with EACH_QUORUM
+ String eachQuorumVal = readValueForKey(TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
+ // Validate that EACH_QUORUM read using driver and the bulk reader are
the same
+
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+ // Revert the value update for all nodes
+ setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ }
+
+ /**
+ * Tests that EACH_QUORUM read succeeds with one node down in each DC.
+ * Tests that value read using driver is the same as the value read using
bulk reader.
+ *
+ * @throws Exception
+ */
+ @Test
+ void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception
+ {
+ // Stop Node1(DC1)
+ cluster.stopUnchecked(cluster.get(1));
+ // Stop Node4(DC2)
+ cluster.stopUnchecked(cluster.get(4));
+
+ // Bulk read with EACH_QUORUM consistency
+ List<Row> rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ validateBulkReadRows(rowList, OG_DATASET);
+
+ // Read TEST_KEY using driver
+ String eachQuorumVal = readValueForKey(TEST_KEY,
ConsistencyLevel.EACH_QUORUM);
+ // Validate that data from driver and bulk reader are the same
+
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+ // Tear down and re-create the cluster
+ tearDown();
+ setup();
+ }
+
+ /**
+ * Tests that:
+ * QUORUM read succeeds with two nodes down in a single DC.
+ * QUORUM read value using bulk reader equals QUORUM read value using
driver.
+ * EACH_QUORUM read with bulk reader fails with cause as
NotEnoughReplicasException.
+ * EACH_QUORUM read with driver fails.
+ *
+ * @throws Exception
+ */
+ @Test
+ void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception
+ {
+ // Stop Node4(DC2)
+ cluster.stopUnchecked(cluster.get(4));
+ // Stop Node5(DC2)
+ cluster.stopUnchecked(cluster.get(5));
+
+ // Bulk read with QUORUM
+ List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+ validateBulkReadRows(rowList, OG_DATASET);
+ // Driver read with QUORUM
+ String quorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
+ // Bulk read and driver read values are the same
+ assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+ // Try bulk reading with EACH_QUORUM consistency. Assert that it fails
with the correct cause.
+ try
+ {
+ bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+ }
+ catch (Exception ex)
+ {
+ assertThat(ex).isNotNull();
+ assertThat(ex).isInstanceOf(SparkException.class);
+
assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
+ assertThat(ex.getCause().getMessage()).isEqualTo("Required 2
replicas but only 1 responded");
+ }
+
+ // Try driver reading with EACH_QUORUM consistency. Assert that it
fails with the correct error.
+ try
+ {
+ readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+ }
+ catch (Exception ex)
+ {
+ assertThat(ex).isNotNull();
+ assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency
level EACH_QUORUM in DC datacenter2");
+ }
+
+ // Tear down and re-create the cluster
+ tearDown();
+ setup();
+ }
+
+ /**
+ * Validates that read repair is disabled.
+ */
+ private void validateReadRepairIsDisabled()
+ {
+ // Update value for Node1 only
+ updateValueNodeInternal(1, TEST_KEY, TEST_VAL);
+ // Validate only Node1 has the updated value
+ validateNodeInternalValue(1, TEST_KEY, TEST_VAL);
+ validateNodeInternalValue(2, TEST_KEY, OG_DATASET.get(1));
+ validateNodeInternalValue(5, TEST_KEY, OG_DATASET.get(1));
+
+ // Read with ALL consistency using coordinator.
+ // If read repair is enabled this should update the value for all
nodes.
+ readValueForKey(TEST_KEY, ConsistencyLevel.ALL);
+
+ // Validate only Node1 has the updated value
+ validateNodeInternalValue(1, TEST_KEY, TEST_VAL);
+ validateNodeInternalValue(2, TEST_KEY, OG_DATASET.get(1));
+ validateNodeInternalValue(5, TEST_KEY, OG_DATASET.get(1));
+
+ // Revert the value update for all nodes
+ setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+ }
+
+ @NotNull
+ private List<Row> bulkRead(String consistency)
+ {
+ List<Row> rowList;
+ Dataset<Row> dataForTable1;
+ dataForTable1 = bulkReaderDataFrame(table1)
+ .option("consistencyLevel", consistency)
+ .option("dc", null)
+ .option("maxRetries", 1)
+ .option("maxMillisToSleep", 50)
+ .option("defaultMillisToSleep", 50)
+ .load();
+
+ rowList = dataForTable1.collectAsList().stream()
+ .sorted(Comparator.comparing(row ->
row.getInt(0)))
+ .collect(Collectors.toList());
+ return rowList;
+ }
+
+ private static void validateBulkReadRows(List<Row> rowList, List<String>
dataSet)
+ {
+ for (int i = 0; i < dataSet.size(); i++)
+ {
+ assertThat(rowList.get(i).getInt(0)).isEqualTo(i);
+ assertThat(rowList.get(i).getString(1)).isEqualTo(dataSet.get(i));
+ }
+ }
+
+ protected void initializeSchemaForTest()
+ {
+ createTestKeyspace(TEST_KEYSPACE, DC1_RF3_DC2_RF3);
+ // Read repair disabled:
https://cassandra.apache.org/doc/latest/cassandra/managing/operating/read_repair.html?utm_source=chatgpt.com#none
+ createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY
KEY, name text) with read_repair='NONE';");
+
+ IInstance firstRunningInstance = cluster.getFirstRunningInstance();
+ for (int i = 0; i < OG_DATASET.size(); i++)
+ {
+ String value = OG_DATASET.get(i);
+ String query1 = String.format("INSERT INTO %s (id, name) VALUES
(%d, '%s');", table1, i, value);
+
+ firstRunningInstance.coordinator().execute(query1,
ConsistencyLevel.ALL);
+ }
+ validateReadRepairIsDisabled();
+ }
+
+ private void updateValueNodeInternal(int node, int key, String value)
+ {
+ cluster.get(node).executeInternal(String.format("UPDATE %s SET
name='%s' WHERE id=%d", table1, value, key));
+ }
+
+ private void validateNodeInternalValue(int node, int key, String val)
+ {
+ assertThat(getNodeInternalValue(node, key)).isEqualTo(val);
+ }
+
+ private String getNodeInternalValue(int node, int key)
+ {
+ Object[][] result = cluster.get(node)
+ .executeInternal(String.format("SELECT name
FROM %s WHERE id=%d", table1, key));
+ return (String) result[0][0];
+ }
+
+ private String readValueForKey(int key, ConsistencyLevel consistencyLevel)
+ {
+ return
readValueForKey(cluster.getFirstRunningInstance().coordinator(), key,
consistencyLevel);
+ }
+
+ private String readValueForKey(ICoordinator coordinator, int key,
ConsistencyLevel consistencyLevel)
+ {
+ Object[][] result = coordinator
+ .execute(String.format("SELECT name FROM %s WHERE
id=%d", table1, key), consistencyLevel);
+ return (String) result[0][0];
+ }
+
+ /**
+ * Sets value for a key with consistency level ALL.
+ *
+ * @param key
+ * @param value
+ */
+ private void setValueForALL(int key, String value)
+ {
+ cluster.getFirstRunningInstance()
+ .coordinator()
+ .execute(String.format("UPDATE %s SET name='%s' WHERE id=%d",
table1, value, key), ConsistencyLevel.ALL);
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 08f3d94b..a3f6384a 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -266,6 +266,7 @@ public class SparkTestUtils
public static Stream<String> sidecarInstancesOptionStream(ICluster<?
extends IInstance> cluster, DnsResolver dnsResolver)
{
return IntStream.rangeClosed(1, cluster.size())
+ .filter(i -> !cluster.get(i).isShutdown())
.mapToObj(i -> {
String ipAddress =
JMXUtil.getJmxHost(cluster.get(i).config());
try
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]