This is an automated email from the ASF dual-hosted git repository.

jberragan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98a767cc CASSANALYTICS-101: Add EACH_QUORUM consistency level support 
in bulk reader (#153)
98a767cc is described below

commit 98a767ccf91ee698a7f94bf8490eb595b1572db3
Author: Sudipta <[email protected]>
AuthorDate: Fri Dec 19 06:46:26 2025 -0800

    CASSANALYTICS-101: Add EACH_QUORUM consistency level support in bulk reader 
(#153)
    
    Add EACH_QUORUM consistency level support in bulk reader
---
 .../spark/data/partitioner/ConsistencyLevel.java   |  32 ++
 .../data/partitioner/ConsistencyLevelTests.java    |  97 +++++
 .../cassandra/spark/data/PartitionedDataLayer.java |  72 +++-
 .../spark/data/partitioner/MultiDCReplicas.java    |  57 +++
 .../spark/data/PartitionedDataLayerTests.java      |  10 +
 .../data/partitioner/MultiDCReplicasTest.java      | 230 ++++++++++++
 .../data/partitioner/MultipleReplicasTests.java    |   9 +
 .../distributed/impl/CassandraCluster.java         |  15 +-
 .../testing/ClusterBuilderConfiguration.java       |  16 +
 .../BulkReaderMultiDCConsistencyTest.java          | 391 +++++++++++++++++++++
 .../apache/cassandra/analytics/SparkTestUtils.java |   1 +
 11 files changed, 914 insertions(+), 16 deletions(-)

diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
index 529d9fdc..6c464442 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevel.java
@@ -19,6 +19,9 @@
 
 package org.apache.cassandra.spark.data.partitioner;
 
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -63,6 +66,35 @@ public enum ConsistencyLevel
                : quorumFor(replicationFactor);
     }
 
+    /**
+     * Calculates the number of replicas that must acknowledge the operation 
in each data center
+     * for EACH_QUORUM consistency level.
+     *
+     * @param replicationFactor the replication factor configuration 
containing data center information
+     * @return a map where keys are data center names and values are the 
number of replicas
+     * required to achieve local quorum in each data center
+     * @throws IllegalArgumentException if the consistency level is not 
EACH_QUORUM or if the
+     *                                  replication strategy is not 
NetworkTopologyStrategy
+     */
+    public Map<String, Integer> eachQuorumBlockFor(@NotNull ReplicationFactor 
replicationFactor)
+    {
+        if (this != EACH_QUORUM)
+        {
+            throw new IllegalArgumentException(String.format("Consistency 
level needed is EACH_QUORUM, provided is: %s",
+                                                             this.name()));
+        }
+        if (replicationFactor.getReplicationStrategy() != 
ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy)
+        {
+            throw new IllegalArgumentException(String.format("Invalid 
Replication Strategy: %s for EACH_QUORUM consistency read.",
+                                                             
replicationFactor.getReplicationStrategy().name()));
+        }
+        return replicationFactor.getOptions().keySet().stream()
+                                .collect(Collectors.toMap(
+                                dataCenter -> dataCenter,
+                                dataCenter -> 
localQuorumFor(replicationFactor, dataCenter)
+                                ));
+    }
+
     private int getNetworkTopologyRf(@NotNull ReplicationFactor 
replicationFactor, @Nullable String dataCenter)
     {
         int rf;
diff --git 
a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
 
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
index 9058bd2c..523b2da6 100644
--- 
a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
+++ 
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/ConsistencyLevelTests.java
@@ -19,12 +19,15 @@
 
 package org.apache.cassandra.spark.data.partitioner;
 
+import java.util.Map;
+
 import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 
 import org.apache.cassandra.spark.data.ReplicationFactor;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 public class ConsistencyLevelTests
 {
@@ -109,4 +112,98 @@ public class ConsistencyLevelTests
         new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
                               ImmutableMap.of("DC1", 5, "DC2", 5)), 
null)).isEqualTo(10);
     }
+
+    @Test
+    void testEachQuorumBlockForWithNetworkTopologyStrategy()
+    {
+        // Create a NetworkTopologyStrategy with multiple data centers
+        Map<String, Integer> options = Map.of(
+        "DC1", 3,
+        "DC2", 5,
+        "DC3", 2
+        );
+        ReplicationFactor replicationFactor = new ReplicationFactor(
+        ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+        options
+        );
+
+        Map<String, Integer> result = 
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor);
+
+        // Expected: (3/2)+1=2, (5/2)+1=3, (2/2)+1=2
+        assertThat(result).hasSize(3)
+                          .containsEntry("DC1", 2)
+                          .containsEntry("DC2", 3)
+                          .containsEntry("DC3", 2);
+    }
+
+    @Test
+    void testEachQuorumBlockForWithSingleDataCenter()
+    {
+        Map<String, Integer> options = Map.of("DC1", 6);
+        ReplicationFactor replicationFactor = new ReplicationFactor(
+        ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+        options
+        );
+
+        Map<String, Integer> result = 
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor);
+
+        // Expected: (6/2)+1=4
+        assertThat(result).containsEntry("DC1", 4);
+        assertThat(result).hasSize(1);
+    }
+
+    @Test
+    void testEachQuorumBlockForWithOddReplicationFactors()
+    {
+        Map<String, Integer> options = Map.of(
+        "DC1", 1,
+        "DC2", 3,
+        "DC3", 5
+        );
+        ReplicationFactor replicationFactor = new ReplicationFactor(
+        ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+        options
+        );
+
+        Map<String, Integer> result = 
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor);
+
+        // Expected: (1/2)+1=1, (3/2)+1=2, (5/2)+1=3
+        assertThat(result).containsEntry("DC1", 1);
+        assertThat(result).containsEntry("DC2", 2);
+        assertThat(result).containsEntry("DC3", 3);
+        assertThat(result).hasSize(3);
+    }
+
+    @Test
+    void testEachQuorumBlockForThrowsExceptionForSimpleStrategy()
+    {
+        ReplicationFactor replicationFactor = 
ReplicationFactor.simpleStrategy(3);
+
+        assertThatThrownBy(() -> 
ConsistencyLevel.EACH_QUORUM.eachQuorumBlockFor(replicationFactor))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid Replication Strategy: SimpleStrategy 
for EACH_QUORUM consistency read.");
+    }
+
+    @Test
+    void testEachQuorumBlockForAllOtherConsistencyLevelsThrowException()
+    {
+        Map<String, Integer> options = Map.of("DC1", 3);
+        ReplicationFactor replicationFactor = new ReplicationFactor(
+        ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+        options
+        );
+
+        // Test a few other consistency levels to ensure they all throw 
exceptions
+        assertThatThrownBy(() -> 
ConsistencyLevel.ONE.eachQuorumBlockFor(replicationFactor))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Consistency level needed is EACH_QUORUM, 
provided is:ONE");
+
+        assertThatThrownBy(() -> 
ConsistencyLevel.LOCAL_QUORUM.eachQuorumBlockFor(replicationFactor))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Consistency level needed is EACH_QUORUM, 
provided is:LOCAL_QUORUM");
+
+        assertThatThrownBy(() -> 
ConsistencyLevel.ALL.eachQuorumBlockFor(replicationFactor))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Consistency level needed is EACH_QUORUM, 
provided is:ALL");
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
index d3200166..72ee3d7c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.data;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,11 +43,12 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.bridge.TokenRange;
-import org.apache.cassandra.spark.utils.RangeUtils;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
 import org.apache.cassandra.spark.data.partitioner.CassandraRing;
 import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel;
+import org.apache.cassandra.spark.data.partitioner.MultiDCReplicas;
 import org.apache.cassandra.spark.data.partitioner.MultipleReplicas;
 import org.apache.cassandra.spark.data.partitioner.NotEnoughReplicasException;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
@@ -55,7 +57,7 @@ import 
org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
 import org.apache.cassandra.spark.sparksql.NoMatchFoundException;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-import org.apache.cassandra.analytics.stats.Stats;
+import org.apache.cassandra.spark.utils.RangeUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -134,10 +136,6 @@ public abstract class PartitionedDataLayer extends 
DataLayer
         {
             throw new IllegalArgumentException("SERIAL or LOCAL_SERIAL are 
invalid consistency levels for the Bulk Reader");
         }
-        if (consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-        {
-            throw new UnsupportedOperationException("EACH_QUORUM has not been 
implemented yet");
-        }
     }
 
     protected void validateReplicationFactor(@NotNull ReplicationFactor 
replicationFactor)
@@ -165,6 +163,10 @@ public abstract class PartitionedDataLayer extends 
DataLayer
         {
             return;
         }
+
+        Preconditions.checkArgument(ConsistencyLevel.EACH_QUORUM != 
consistencyLevel,
+                                    "A DC should not be specified for 
EACH_QUORUM consistency level. Provided DC: " + dc);
+
         
Preconditions.checkArgument(replicationFactor.getOptions().containsKey(dc),
                                     "DC %s not found in replication factor %s",
                                     dc, 
replicationFactor.getOptions().keySet());
@@ -279,12 +281,53 @@ public abstract class PartitionedDataLayer extends 
DataLayer
         LOGGER.info("Creating partitioned SSTablesSupplier for Spark partition 
partitionId={} rangeLower={} rangeUpper={} numReplicas={}",
                     partitionId, range.lowerEndpoint(), range.upperEndpoint(), 
replicas.size());
 
+        return constructSSTablesSupplier(partitionId, replicationFactor, 
instRanges, replicas, range);
+    }
+
+    @NotNull
+    private SSTablesSupplier constructSSTablesSupplier(
+    int partitionId,
+    ReplicationFactor replicationFactor,
+    Map<Range<BigInteger>, List<CassandraInstance>> instRanges,
+    Set<CassandraInstance> replicas,
+    Range<BigInteger> range)
+    {
+        if (consistencyLevel == ConsistencyLevel.EACH_QUORUM)
+        {
+            Map<String, Integer> minReplicasPerDC = 
consistencyLevel.eachQuorumBlockFor(replicationFactor);
+            LOGGER.debug("Reading with EACH_QUORUM consistency level for 
partitionId={}, minReplicasPerDC={}",
+                         partitionId, minReplicasPerDC);
+            Map<String, Set<CassandraInstance>> replicasByDC = 
replicas.stream()
+                                                                       
.collect(Collectors.groupingBy(
+                                                                       
CassandraInstance::dataCenter,
+                                                                       
Collectors.toSet()
+                                                                       ));
+
+            Map<String, SSTablesSupplier> perDCSSTablesSupplier = new 
HashMap<>(minReplicasPerDC.size());
+            for (Map.Entry<String, Integer> entry : 
minReplicasPerDC.entrySet())
+            {
+                Set<CassandraInstance> replicasInDC = 
replicasByDC.getOrDefault(entry.getKey(), Collections.emptySet());
+                MultipleReplicas multipleReplicas = 
constructSSTablesSupplierSingleDC(partitionId, instRanges, replicasInDC, range, 
entry.getValue());
+                perDCSSTablesSupplier.put(entry.getKey(), multipleReplicas);
+            }
+            return new MultiDCReplicas(perDCSSTablesSupplier);
+        }
+        int minReplicas = consistencyLevel.blockFor(replicationFactor, 
datacenter);
+        return constructSSTablesSupplierSingleDC(partitionId, instRanges, 
replicas, range, minReplicas);
+    }
+
+    @NotNull
+    private MultipleReplicas constructSSTablesSupplierSingleDC(
+    int partitionId,
+    Map<Range<BigInteger>, List<CassandraInstance>> instRanges,
+    Set<CassandraInstance> replicas,
+    Range<BigInteger> range, int minReplicas)
+    {
         // Use consistency level and replication factor to calculate min 
number of replicas required
         // to satisfy consistency level; split replicas into 'primary' and 
'backup' replicas,
         // attempt on primary replicas and use backups to retry in the event 
of a failure
-        int minReplicas = consistencyLevel.blockFor(replicationFactor, 
datacenter);
         ReplicaSet replicaSet = PartitionedDataLayer.splitReplicas(
-                consistencyLevel, datacenter, instRanges, replicas, 
this::getAvailability, minReplicas, partitionId);
+        consistencyLevel, datacenter, instRanges, replicas, 
this::getAvailability, minReplicas, partitionId);
         if (replicaSet.primary().size() < minReplicas)
         {
             // Could not find enough primary replicas to meet consistency level
@@ -295,11 +338,16 @@ public abstract class PartitionedDataLayer extends 
DataLayer
         ExecutorService executor = executorService();
         Stats stats = stats();
         Set<SingleReplica> primaryReplicas = replicaSet.primary().stream()
-                .map(instance -> new SingleReplica(instance, this, range, 
partitionId, executor, stats, replicaSet.isRepairPrimary(instance)))
-                .collect(Collectors.toSet());
+                                                       .map(instance ->
+                                                            new 
SingleReplica(instance, this, range,
+                                                                              
partitionId, executor, stats,
+                                                                              
replicaSet.isRepairPrimary(instance)))
+                                                       
.collect(Collectors.toSet());
         Set<SingleReplica> backupReplicas = replicaSet.backup().stream()
-                .map(instance -> new SingleReplica(instance, this, range, 
partitionId, executor, stats, true))
-                .collect(Collectors.toSet());
+                                                      .map(instance ->
+                                                           new 
SingleReplica(instance, this, range,
+                                                                             
partitionId, executor, stats, true))
+                                                      
.collect(Collectors.toSet());
 
         return new MultipleReplicas(primaryReplicas, backupReplicas, stats);
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicas.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicas.java
new file mode 100644
index 00000000..dae06fa9
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicas.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.partitioner;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.spark.data.SSTablesSupplier;
+import org.apache.cassandra.spark.reader.SparkSSTableReader;
+
+/**
+ * This class is SSTable supplier replicas in multiple data centers.
+ * This type of supplier is useful for EACH_QUORUM consistency level.
+ */
+public class MultiDCReplicas extends SSTablesSupplier
+{
+    private final Map<String, SSTablesSupplier> replicasPerDC;
+
+    public MultiDCReplicas(Map<String, SSTablesSupplier> replicasPerDC)
+    {
+        if (replicasPerDC == null || replicasPerDC.isEmpty())
+        {
+            throw new IllegalArgumentException("replicasPerDC cannot be null 
or empty");
+        }
+        this.replicasPerDC = replicasPerDC;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public <T extends SparkSSTableReader> Set<T> openAll(ReaderOpener<T> 
readerOpener)
+    {
+        Set<T> combinedReaders = new HashSet<>();
+        replicasPerDC.values()
+                     .forEach(supplier -> 
combinedReaders.addAll(supplier.openAll(readerOpener)));
+        return combinedReaders;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
index 2768b146..7ccb879b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/PartitionedDataLayerTests.java
@@ -234,6 +234,16 @@ public class PartitionedDataLayerTests extends 
VersionRunner
             .isInstanceOf(IllegalArgumentException.class);
     }
 
+    @Test
+    public void testReplicationFactorEachQuorum()
+    {
+        assertThatThrownBy(() -> PartitionedDataLayer
+                                 .validateReplicationFactor(EACH_QUORUM,
+                                                            
TestUtils.networkTopologyStrategy(ImmutableMap.of("PV", 3, "MR", 3)),
+                                                            "MR"))
+        .isInstanceOf(IllegalArgumentException.class);
+    }
+
     @ParameterizedTest
     @MethodSource("org.apache.cassandra.spark.data.VersionRunner#bridges")
     public void testSSTableSupplier(CassandraBridge bridge)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicasTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicasTest.java
new file mode 100644
index 00000000..5575ff7a
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultiDCReplicasTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.partitioner;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.spark.data.SSTablesSupplier;
+import org.apache.cassandra.spark.reader.SparkSSTableReader;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link MultiDCReplicas} class
+ */
+public class MultiDCReplicasTest
+{
+    @Mock
+    private SSTablesSupplier dc1Supplier;
+
+    @Mock
+    private SSTablesSupplier dc2Supplier;
+
+    @Mock
+    private SSTablesSupplier.ReaderOpener<TestSparkSSTableReader> readerOpener;
+
+    private MultiDCReplicas multiDCReplicas;
+
+    @BeforeEach
+    void setUp()
+    {
+        MockitoAnnotations.openMocks(this);
+    }
+
+    @Test
+    void testConstructorWithValidMap()
+    {
+        Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+        replicasMap.put("dc1", dc1Supplier);
+        replicasMap.put("dc2", dc2Supplier);
+
+        multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+        assertThat(multiDCReplicas).isNotNull();
+    }
+
+    @Test
+    void testConstructorWithNullMap()
+    {
+        assertThatThrownBy(() -> new MultiDCReplicas(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("replicasPerDC cannot be null or empty");
+    }
+
+    @Test
+    void testConstructorWithEmptyMap()
+    {
+        assertThatThrownBy(() -> new MultiDCReplicas(Map.of()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("replicasPerDC cannot be null or empty");
+    }
+
+    @Test
+    void testOpenAllWithMultipleDCs()
+    {
+        Map<String, SSTablesSupplier> replicasMap = Map.of("dc1", dc1Supplier,
+                                                           "dc2", dc2Supplier);
+
+        TestSparkSSTableReader reader1 = new 
TestSparkSSTableReader(BigInteger.ONE, BigInteger.TEN);
+        TestSparkSSTableReader reader2 = new 
TestSparkSSTableReader(BigInteger.valueOf(11), BigInteger.valueOf(20));
+        TestSparkSSTableReader reader3 = new 
TestSparkSSTableReader(BigInteger.valueOf(21), BigInteger.valueOf(30));
+
+        Set<TestSparkSSTableReader> dc1Readers = Set.of(reader1, reader2);
+        Set<TestSparkSSTableReader> dc2Readers = Set.of(reader3);
+
+        when(dc1Supplier.openAll(readerOpener)).thenReturn(dc1Readers);
+        when(dc2Supplier.openAll(readerOpener)).thenReturn(dc2Readers);
+
+        multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+        Set<TestSparkSSTableReader> result = 
multiDCReplicas.openAll(readerOpener);
+
+        assertThat(result).hasSize(3);
+        assertThat(result).containsExactlyInAnyOrder(reader1, reader2, 
reader3);
+    }
+
+    @Test
+    void testOpenAllWithSingleDC()
+    {
+        Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+        replicasMap.put("dc1", dc1Supplier);
+
+        TestSparkSSTableReader reader1 = new 
TestSparkSSTableReader(BigInteger.ONE, BigInteger.TEN);
+        TestSparkSSTableReader reader2 = new 
TestSparkSSTableReader(BigInteger.valueOf(11), BigInteger.valueOf(20));
+
+        Set<TestSparkSSTableReader> dc1Readers = Set.of(reader1, reader2);
+
+        when(dc1Supplier.openAll(readerOpener)).thenReturn(dc1Readers);
+
+        multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+        Set<TestSparkSSTableReader> result = 
multiDCReplicas.openAll(readerOpener);
+
+        assertThat(result).hasSize(2);
+        assertThat(result).containsExactlyInAnyOrder(reader1, reader2);
+    }
+
+    @Test
+    void testOpenAllWithEmptySupplierResults()
+    {
+        // Arrange
+        Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+        replicasMap.put("dc1", dc1Supplier);
+        replicasMap.put("dc2", dc2Supplier);
+
+        when(dc1Supplier.openAll(readerOpener)).thenReturn(new HashSet<>());
+        when(dc2Supplier.openAll(readerOpener)).thenReturn(new HashSet<>());
+
+        multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+        Set<TestSparkSSTableReader> result = 
multiDCReplicas.openAll(readerOpener);
+
+        assertThat(result).isNotNull();
+        assertThat(result).isEmpty();
+    }
+
+    @Test
+    void testOpenAllWithDuplicateReaders()
+    {
+        Map<String, SSTablesSupplier> replicasMap = new HashMap<>();
+        replicasMap.put("dc1", dc1Supplier);
+        replicasMap.put("dc2", dc2Supplier);
+
+        TestSparkSSTableReader reader1 = new 
TestSparkSSTableReader(BigInteger.ONE, BigInteger.TEN);
+        TestSparkSSTableReader reader2 = new 
TestSparkSSTableReader(BigInteger.valueOf(11), BigInteger.valueOf(20));
+
+        Set<TestSparkSSTableReader> dc1Readers = Set.of(reader1, reader2);
+        Set<TestSparkSSTableReader> dc2Readers = Set.of(reader1, reader2);
+
+        when(dc1Supplier.openAll(readerOpener)).thenReturn(dc1Readers);
+        when(dc2Supplier.openAll(readerOpener)).thenReturn(dc2Readers);
+
+        multiDCReplicas = new MultiDCReplicas(replicasMap);
+
+        Set<TestSparkSSTableReader> result = 
multiDCReplicas.openAll(readerOpener);
+
+        assertThat(result).hasSize(2);
+        assertThat(result).containsExactlyInAnyOrder(reader1, reader2);
+    }
+
+    /**
+     * Test implementation of SparkSSTableReader for testing purposes
+     */
+    private static class TestSparkSSTableReader implements SparkSSTableReader
+    {
+        private final BigInteger firstToken;
+        private final BigInteger lastToken;
+
+        TestSparkSSTableReader(BigInteger firstToken, BigInteger lastToken)
+        {
+            this.firstToken = firstToken;
+            this.lastToken = lastToken;
+        }
+
+        @Override
+        public BigInteger firstToken()
+        {
+            return firstToken;
+        }
+
+        @Override
+        public BigInteger lastToken()
+        {
+            return lastToken;
+        }
+
+        public boolean ignore()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj)
+            {
+                return true;
+            }
+            if (!(obj instanceof TestSparkSSTableReader))
+            {
+                return false;
+            }
+            TestSparkSSTableReader that = (TestSparkSSTableReader) obj;
+            return firstToken.equals(that.firstToken) && 
lastToken.equals(that.lastToken);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return firstToken.hashCode() + lastToken.hashCode();
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
index 3d5dfb0a..91a0c22e 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/partitioner/MultipleReplicasTests.java
@@ -189,8 +189,11 @@ public class MultipleReplicasTests
 
     public static class TestSSTableReader implements SparkSSTableReader
     {
+        private final SSTable ssTable;
+
         public TestSSTableReader(SSTable ssTable)
         {
+            this.ssTable = ssTable;
         }
 
         public BigInteger firstToken()
@@ -207,5 +210,11 @@ public class MultipleReplicasTests
         {
             return false;
         }
+
+        @Override
+        public String toString()
+        {
+            return ssTable.getDataFileName();
+        }
     }
 }
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index ab8524dc..42bac995 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -132,10 +132,17 @@ public class CassandraCluster<I extends IInstance> 
implements IClusterExtension<
 
         if (dcCount > 1)
         {
-            clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
-                                                              (nodeId) -> 
nodeId % 2 != 0 ?
-                                                                          
dcAndRack("datacenter1", "rack1") :
-                                                                          
dcAndRack("datacenter2", "rack2")));
+            if (configuration.dcAndRackSupplier != null)
+            {
+                
clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, 
configuration.dcAndRackSupplier));
+            }
+            else
+            {
+                
clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
+                                                                  (nodeId) -> 
nodeId % 2 != 0 ?
+                                                                              
dcAndRack("datacenter1", "rack1") :
+                                                                              
dcAndRack("datacenter2", "rack2")));
+            }
         }
 
         if (configuration.instanceInitializer != null)
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
index a08fe932..4a0d6448 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/ClusterBuilderConfiguration.java
@@ -22,10 +22,12 @@ package org.apache.cassandra.testing;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.IntFunction;
 
 import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 
 /**
  * Defines the configuration to build the {@link IClusterExtension} cluster
@@ -42,6 +44,7 @@ public class ClusterBuilderConfiguration
     public String partitioner;
     public Map<String, Object> additionalInstanceConfig = null;
     public int tokenCount = 1;
+    public IntFunction<NetworkTopology.DcAndRack> dcAndRackSupplier;
 
     /**
      * Adds a features to the list of default features.
@@ -176,4 +179,17 @@ public class ClusterBuilderConfiguration
         this.tokenCount = tokenCount;
         return this;
     }
+
+    /**
+     * Sets a supplier function that provides datacenter and rack information 
for each node in the cluster.
+     *
+     * @param dcAndRackSupplier a function that takes a node index and returns 
the corresponding
+     *                          datacenter and rack configuration for that node
+     * @return this configuration instance for method chaining
+     */
+    public ClusterBuilderConfiguration 
dcAndRackSupplier(IntFunction<NetworkTopology.DcAndRack> dcAndRackSupplier)
+    {
+        this.dcAndRackSupplier = dcAndRackSupplier;
+        return this;
+    }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
new file mode 100644
index 00000000..b4bf2065
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderMultiDCConsistencyTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.agent.ByteBuddyAgent;
+import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.data.CassandraDataLayer;
+import org.apache.cassandra.spark.data.PartitionedDataLayer;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.data.partitioner.NotEnoughReplicasException;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BulkReaderMultiDCConsistencyTest extends 
SharedClusterSparkIntegrationTestBase
+{
+    static final List<String> OG_DATASET = Arrays.asList("a", "b", "c", "d", 
"e", "f", "g");
+    static final int TEST_KEY = 1;
+    static final String TEST_VAL = "C*";
+    QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .dcCount(2)
+                    .nodesPerDc(3)
+                    .dcAndRackSupplier((nodeId) -> {
+                        switch (nodeId)
+                        {
+                            case 1:
+                            case 2:
+                            case 3:
+                                return dcAndRack("datacenter1", "rack1");
+                            case 4:
+                            case 5:
+                            case 6:
+                                return dcAndRack("datacenter2", "rack1");
+                            default:
+                                return dcAndRack("", "");
+                        }
+                    });
+    }
+
+    /**
+     * Happy path test. All nodes have the updated values.
+     * QUORUM == EACH_QUORUM == ALL == driver read
+     */
+    @Test
+    void happyPathTest()
+    {
+        List<String> testDataSet = new ArrayList<>(OG_DATASET);
+        testDataSet.set(TEST_KEY, TEST_VAL);
+
+        // Set value=TEST_VAL for key=TEST_KEY for all nodes
+        setValueForALL(TEST_KEY, TEST_VAL);
+
+        // Bulk read with ALL consistency
+        List<Row> rowList = bulkRead(ConsistencyLevel.ALL.name());
+        validateBulkReadRows(rowList, testDataSet);
+
+        // Bulk read with QUORUM consistency
+        rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+        validateBulkReadRows(rowList, testDataSet);
+
+        // Bulk read with EACH_QUORUM consistency
+        rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+        validateBulkReadRows(rowList, testDataSet);
+
+        // Read the value for the test key using driver for different 
consistency levels
+        String valAll = readValueForKey(TEST_KEY, ConsistencyLevel.ALL);
+        String valQuorum = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
+        String valEachQuorum = readValueForKey(TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
+        
assertThat(valAll).isEqualTo(valQuorum).isEqualTo(valEachQuorum).isEqualTo(rowList.get(1).getString(1));
+
+        // Revert the value update for all nodes
+        setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+    }
+
+    public static PartitionedDataLayer.AvailabilityHint 
getAvailability(CassandraInstance instance)
+    {
+        if (instance.nodeName().equals("localhost5") || 
instance.nodeName().equals("localhost6"))
+        {
+            return PartitionedDataLayer.AvailabilityHint.MOVING;
+        }
+        return PartitionedDataLayer.AvailabilityHint.UP;
+    }
+
+    /**
+     * This test creates a scenario where bulk reader reads the most recently 
updated value with EACH_QUORUM
+     * but reads stale value with QUORUM. This shows that QUORUM is different 
from EACH_QUORUM in multi-dc settings.
+     * Here node5(DC2) and node6(DC2) has the update value for TEST_KEY.
+     *
+     * @throws NoSuchMethodException
+     */
+    @Test
+    void eachQuorumIsNotQuorum() throws NoSuchMethodException
+    {
+        List<String> updatedDataSet = new ArrayList<>(OG_DATASET);
+        updatedDataSet.set(1, TEST_VAL);
+
+        // Internally update value for TEST_KEY for node5 and node6. This 
update doesn't propagate to other nodes.
+        updateValueNodeInternal(5, TEST_KEY, TEST_VAL);
+        updateValueNodeInternal(6, TEST_KEY, TEST_VAL);
+
+        // Bytecode injection to simulate a scenario where node5 and node6 are 
at the end of the replica list for bulk reader.
+        // This simulation mimics a real world scenario.
+        // With this arrangement PartitionedDataLayer.splitReplicas method for 
QUORUM will split the replicas like below:
+        // primaryReplicas: [Node1, Node2, Node3, Node4]
+        // secondaryReplicas: [Node5, Node6]
+        // Number of nodes required for QUORUM read id 6/1 + 1 = 4. Bulk 
reader will read from [Node1, Node2, Node3, Node4] only.
+        ByteBuddyAgent.install();
+        new ByteBuddy()
+        .redefine(CassandraDataLayer.class)
+        .method(ElementMatchers.named("getAvailability"))
+        .intercept(
+        
MethodCall.invoke(BulkReaderMultiDCConsistencyTest.class.getMethod("getAvailability",
 CassandraInstance.class))
+                  .withAllArguments()
+        )
+        .make()
+        .load(
+        CassandraDataLayer.class.getClassLoader(),
+        ClassReloadingStrategy.fromInstalledAgent()
+        );
+
+        // Bulk read with QUORUM consistency
+        List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+        // Validate that the result doesn't have the updated data.
+        validateBulkReadRows(rowList, OG_DATASET);
+
+        // Message filter to mimic message drops from Node5 and Node6 to Node1.
+        // We are setting this up to simulate a scenario where reading values 
with QUORUM consistency with driver
+        // and using Node1 as the coordinator doesn't get the values from 
Node5 and Node6.
+        cluster.filters().allVerbs().from(5).to(1).drop();
+        cluster.filters().allVerbs().from(6).to(1).drop();
+
+        // Read value for TEST_KEY with driver using Node1 as coordinator
+        String quorumVal = readValueForKey(cluster.get(1).coordinator(), 
TEST_KEY, ConsistencyLevel.QUORUM);
+        // Validate that the updated value is not read
+        assertThat(quorumVal).isEqualTo(OG_DATASET.get(TEST_KEY));
+
+        // Cleanup message filter
+        cluster.filters().reset();
+
+        // Bulk read with EACH_QUORUM consistency
+        rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+        // Validate that bulk reader was able to read the updated value
+        validateBulkReadRows(rowList, updatedDataSet);
+        // Read value using driver with EACH_QUORUM
+        String eachQuorumVal = readValueForKey(TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
+        // Validate that EACH_QUORUM read using driver and the bulk reader are 
the same
+        
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+        // Revert the value update for all nodes
+        setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+    }
+
+    /**
+     * Tests that EACH_QUORUM read succeeds with one node down in each DC.
+     * Tests that value read using driver is the same as the value read using 
bulk reader.
+     *
+     * @throws Exception
+     */
+    @Test
+    void eachQuorumSuccessWithOneNodeDownEachDC() throws Exception
+    {
+        // Stop Node1(DC1)
+        cluster.stopUnchecked(cluster.get(1));
+        // Stop Node4(DC2)
+        cluster.stopUnchecked(cluster.get(4));
+
+        // Bulk read with EACH_QUORUM consistency
+        List<Row> rowList = bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+        validateBulkReadRows(rowList, OG_DATASET);
+
+        // Read TEST_KEY using driver
+        String eachQuorumVal = readValueForKey(TEST_KEY, 
ConsistencyLevel.EACH_QUORUM);
+        // Validate that data from driver and bulk reader are the same
+        
assertThat(eachQuorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+        // Tear down and re-create the cluster
+        tearDown();
+        setup();
+    }
+
+    /**
+     * Tests that:
+     * QUORUM read succeeds with two nodes down in a single DC.
+     * QUORUM read value using bulk reader equals QUORUM read value using 
driver.
+     * EACH_QUORUM read with bulk reader fails with cause as 
NotEnoughReplicasException.
+     * EACH_QUORUM read with driver fails.
+     *
+     * @throws Exception
+     */
+    @Test
+    void eachQuorumFailureWithTwoNodesDownOneDC() throws Exception
+    {
+        // Stop Node4(DC2)
+        cluster.stopUnchecked(cluster.get(4));
+        // Stop Node5(DC2)
+        cluster.stopUnchecked(cluster.get(5));
+
+        // Bulk read with QUORUM
+        List<Row> rowList = bulkRead(ConsistencyLevel.QUORUM.name());
+        validateBulkReadRows(rowList, OG_DATASET);
+        // Driver read with QUORUM
+        String quorumVal = readValueForKey(TEST_KEY, ConsistencyLevel.QUORUM);
+        // Bulk read and driver read values are the same
+        assertThat(quorumVal).isEqualTo(rowList.get(TEST_KEY).getString(1));
+
+        // Try bulk reading with EACH_QUORUM consistency. Assert that it fails 
with the correct cause.
+        try
+        {
+            bulkRead(ConsistencyLevel.EACH_QUORUM.name());
+        }
+        catch (Exception ex)
+        {
+            assertThat(ex).isNotNull();
+            assertThat(ex).isInstanceOf(SparkException.class);
+            
assertThat(ex.getCause()).isInstanceOf(NotEnoughReplicasException.class);
+            assertThat(ex.getCause().getMessage()).isEqualTo("Required 2 
replicas but only 1 responded");
+        }
+
+        // Try driver reading with EACH_QUORUM consistency. Assert that it 
fails with the correct error.
+        try
+        {
+            readValueForKey(TEST_KEY, ConsistencyLevel.EACH_QUORUM);
+        }
+        catch (Exception ex)
+        {
+            assertThat(ex).isNotNull();
+            assertThat(ex.getMessage()).isEqualTo("Cannot achieve consistency 
level EACH_QUORUM in DC datacenter2");
+        }
+
+        // Tear down and re-create the cluster
+        tearDown();
+        setup();
+    }
+
+    /**
+     * Validates that read repair is disabled.
+     */
+    private void validateReadRepairIsDisabled()
+    {
+        // Update value for Node1 only
+        updateValueNodeInternal(1, TEST_KEY, TEST_VAL);
+        // Validate only Node1 has the updated value
+        validateNodeInternalValue(1, TEST_KEY, TEST_VAL);
+        validateNodeInternalValue(2, TEST_KEY, OG_DATASET.get(1));
+        validateNodeInternalValue(5, TEST_KEY, OG_DATASET.get(1));
+
+        // Read with ALL consistency using coordinator.
+        // If read repair is enabled this should update the value for all 
nodes.
+        readValueForKey(TEST_KEY, ConsistencyLevel.ALL);
+
+        // Validate only Node1 has the updated value
+        validateNodeInternalValue(1, TEST_KEY, TEST_VAL);
+        validateNodeInternalValue(2, TEST_KEY, OG_DATASET.get(1));
+        validateNodeInternalValue(5, TEST_KEY, OG_DATASET.get(1));
+
+        // Revert the value update for all nodes
+        setValueForALL(TEST_KEY, OG_DATASET.get(TEST_KEY));
+    }
+
+    @NotNull
+    private List<Row> bulkRead(String consistency)
+    {
+        List<Row> rowList;
+        Dataset<Row> dataForTable1;
+        dataForTable1 = bulkReaderDataFrame(table1)
+                        .option("consistencyLevel", consistency)
+                        .option("dc", null)
+                        .option("maxRetries", 1)
+                        .option("maxMillisToSleep", 50)
+                        .option("defaultMillisToSleep", 50)
+                        .load();
+
+        rowList = dataForTable1.collectAsList().stream()
+                               .sorted(Comparator.comparing(row -> 
row.getInt(0)))
+                               .collect(Collectors.toList());
+        return rowList;
+    }
+
+    private static void validateBulkReadRows(List<Row> rowList, List<String> 
dataSet)
+    {
+        for (int i = 0; i < dataSet.size(); i++)
+        {
+            assertThat(rowList.get(i).getInt(0)).isEqualTo(i);
+            assertThat(rowList.get(i).getString(1)).isEqualTo(dataSet.get(i));
+        }
+    }
+
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF3_DC2_RF3);
+        // Read repair disabled: 
https://cassandra.apache.org/doc/latest/cassandra/managing/operating/read_repair.html?utm_source=chatgpt.com#none
+        createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY 
KEY, name text) with read_repair='NONE';");
+
+        IInstance firstRunningInstance = cluster.getFirstRunningInstance();
+        for (int i = 0; i < OG_DATASET.size(); i++)
+        {
+            String value = OG_DATASET.get(i);
+            String query1 = String.format("INSERT INTO %s (id, name) VALUES 
(%d, '%s');", table1, i, value);
+
+            firstRunningInstance.coordinator().execute(query1, 
ConsistencyLevel.ALL);
+        }
+        validateReadRepairIsDisabled();
+    }
+
+    private void updateValueNodeInternal(int node, int key, String value)
+    {
+        cluster.get(node).executeInternal(String.format("UPDATE %s SET 
name='%s' WHERE id=%d", table1, value, key));
+    }
+
+    private void validateNodeInternalValue(int node, int key, String val)
+    {
+        assertThat(getNodeInternalValue(node, key)).isEqualTo(val);
+    }
+
+    private String getNodeInternalValue(int node, int key)
+    {
+        Object[][] result = cluster.get(node)
+                                   .executeInternal(String.format("SELECT name 
FROM %s WHERE id=%d", table1, key));
+        return (String) result[0][0];
+    }
+
+    private String readValueForKey(int key, ConsistencyLevel consistencyLevel)
+    {
+        return 
readValueForKey(cluster.getFirstRunningInstance().coordinator(), key, 
consistencyLevel);
+    }
+
+    private String readValueForKey(ICoordinator coordinator, int key, 
ConsistencyLevel consistencyLevel)
+    {
+        Object[][] result = coordinator
+                            .execute(String.format("SELECT name FROM %s WHERE 
id=%d", table1, key), consistencyLevel);
+        return (String) result[0][0];
+    }
+
+    /**
+     * Sets value for a key with consistency level ALL.
+     *
+     * @param key
+     * @param value
+     */
+    private void setValueForALL(int key, String value)
+    {
+        cluster.getFirstRunningInstance()
+               .coordinator()
+               .execute(String.format("UPDATE %s SET name='%s' WHERE id=%d", 
table1, value, key), ConsistencyLevel.ALL);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 08f3d94b..a3f6384a 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -266,6 +266,7 @@ public class SparkTestUtils
     public static Stream<String> sidecarInstancesOptionStream(ICluster<? 
extends IInstance> cluster, DnsResolver dnsResolver)
     {
         return IntStream.rangeClosed(1, cluster.size())
+                        .filter(i -> !cluster.get(i).isShutdown())
                         .mapToObj(i -> {
                             String ipAddress = 
JMXUtil.getJmxHost(cluster.get(i).config());
                             try


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to