This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45de9e0  CASSANDRA-19634: Improve test coverage for downed instances 
(#59)
45de9e0 is described below

commit 45de9e08e7dc177f1f3273456b62af7ee0f5dbdb
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Sun May 12 11:35:14 2024 -0700

    CASSANDRA-19634: Improve test coverage for downed instances (#59)
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19634
---
 .../distributed/impl/CassandraCluster.java         |   6 -
 .../testing/SharedClusterIntegrationTestBase.java  | 114 +++++++-----
 .../cassandra/testing/IClusterExtension.java       |   6 -
 .../cassandra/analytics/BlockedInstancesTest.java  |   6 +-
 .../analytics/BulkWriteDownInstanceTest.java       | 145 +++++++++++++++
 .../analytics/BulkWriteDownSidecarTest.java        | 199 +++++++++++++++++++++
 .../cassandra/analytics/ResiliencyTestBase.java    |  26 +--
 .../apache/cassandra/analytics/SparkTestUtils.java |  29 ++-
 .../cassandra/analytics/TestConsistencyLevel.java  |  49 +++++
 .../analytics/expansion/JoiningMultiDCTest.java    |  11 +-
 .../analytics/expansion/JoiningSingleNodeTest.java |   7 +-
 .../analytics/expansion/JoiningTestBase.java       |   1 +
 .../movement/NodeMovementMultiDCTest.java          |  11 +-
 .../analytics/movement/NodeMovementTest.java       |   7 +-
 .../analytics/movement/NodeMovementTestBase.java   |   3 +-
 .../HostReplacementMultiDCFailureTest.java         |   3 +-
 ...ReplacementMultiDCInsufficientReplicasTest.java |  28 +--
 .../replacement/HostReplacementMultiDCTest.java    |  11 +-
 .../analytics/replacement/HostReplacementTest.java |   1 +
 .../replacement/HostReplacementTestBase.java       |   7 +-
 .../analytics/shrink/LeavingHalfTest.java          |   1 +
 .../shrink/LeavingMultiDCFailureTest.java          |   1 +
 .../LeavingMultiDCHalveClusterFailureTest.java     |   1 +
 .../shrink/LeavingMultiDCHalveClusterTest.java     |   4 +-
 .../analytics/shrink/LeavingMultiDCTest.java       |   4 +-
 .../shrink/LeavingMultipleFailureTest.java         |   1 +
 .../analytics/shrink/LeavingMultipleTest.java      |   1 +
 .../analytics/shrink/LeavingSingleFailureTest.java |   1 +
 .../analytics/shrink/LeavingSingleTest.java        |   1 +
 .../analytics/shrink/LeavingTestBase.java          |  11 +-
 .../BulkWriteS3CompatModeSimpleTest.java           |  27 ++-
 31 files changed, 572 insertions(+), 151 deletions(-)

diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index 795c758..a339be2 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -152,12 +152,6 @@ public class CassandraCluster<I extends IInstance> 
implements IClusterExtension<
         return delegate.newInstanceConfig();
     }
 
-    @Override
-    public IInstanceConfig createInstanceConfig(int nodeNum)
-    {
-        return delegate.createInstanceConfig(nodeNum);
-    }
-
     @Override
     public ICluster<I> delegate()
     {
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
index 67864a4..a67591d 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java
@@ -27,15 +27,15 @@ import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -142,19 +142,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(VertxExtension.class)
 public abstract class SharedClusterIntegrationTestBase
 {
-    public static final String SIDECAR_S3_ENDPOINT_OVERRIDE_OPT = 
"S3_ENDPOINT_OVERRIDE";
-
     protected final Logger logger = 
LoggerFactory.getLogger(SharedClusterIntegrationTestBase.class);
     private static final int MAX_CLUSTER_PROVISION_RETRIES = 5;
 
     @TempDir
     static Path secretsPath;
 
-    protected Vertx vertx;
-    protected DnsResolver dnsResolver;
+    protected DnsResolver dnsResolver = new LocalhostResolver();
     protected IClusterExtension<? extends IInstance> cluster;
     protected Server server;
-    protected Injector injector;
     protected TestVersion testVersion;
     protected MtlsTestHelper mtlsTestHelper;
     private IsolatedDTestClassLoaderWrapper classLoaderWrapper;
@@ -321,9 +317,9 @@ public abstract class SharedClusterIntegrationTestBase
     /**
      * Override to provide additional options to configure sidecar
      */
-    protected Map<String, String> sidecarAdditionalOptions()
+    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
     {
-        return Collections.emptyMap();
+        return null;
     }
 
     /**
@@ -333,18 +329,30 @@ public abstract class SharedClusterIntegrationTestBase
      * @throws InterruptedException when the startup times out
      */
     protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
+    {
+        server = startSidecarWithInstances(cluster);
+    }
+
+    /**
+     * Starts Sidecar configured to run with the provided {@link IInstance}s 
from the cluster.
+     *
+     * @param instances the Cassandra instances Sidecar will manage
+     * @return the started server
+     * @throws InterruptedException when the server start operation is 
interrupted
+     */
+    protected Server startSidecarWithInstances(Iterable<? extends IInstance> 
instances) throws InterruptedException
     {
         VertxTestContext context = new VertxTestContext();
-        AbstractModule testModule = new IntegrationTestModule(cluster, 
classLoaderWrapper, mtlsTestHelper, sidecarAdditionalOptions());
-        injector = Guice.createInjector(Modules.override(new 
MainModule()).with(testModule));
-        dnsResolver = injector.getInstance(DnsResolver.class);
-        vertx = injector.getInstance(Vertx.class);
-        server = injector.getInstance(Server.class);
-        server.start()
-              .onSuccess(s -> context.completeNow())
-              .onFailure(context::failNow);
+        AbstractModule testModule = new IntegrationTestModule(instances, 
classLoaderWrapper, mtlsTestHelper,
+                                                              dnsResolver, 
configurationOverrides());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule()).with(testModule));
+        Server sidecarServer = injector.getInstance(Server.class);
+        sidecarServer.start()
+                     .onSuccess(s -> context.completeNow())
+                     .onFailure(context::failNow);
 
         context.awaitCompletion(5, TimeUnit.SECONDS);
+        return sidecarServer;
     }
 
     /**
@@ -430,7 +438,9 @@ public abstract class SharedClusterIntegrationTestBase
      */
     protected Object[][] queryAllData(QualifiedName table, ConsistencyLevel 
consistencyLevel)
     {
-        return cluster.coordinator(1).execute(String.format("SELECT * FROM 
%s;", table), consistencyLevel);
+        return cluster.getFirstRunningInstance()
+                      .coordinator()
+                      .execute(String.format("SELECT * FROM %s;", table), 
consistencyLevel);
     }
 
     /**
@@ -441,7 +451,7 @@ public abstract class SharedClusterIntegrationTestBase
      */
     protected ResultSet queryAllDataWithDriver(QualifiedName table)
     {
-        return queryAllDataWithDriver(table, 
com.datastax.driver.core.ConsistencyLevel.ALL);
+        return queryAllDataWithDriver(table, ConsistencyLevel.ALL);
     }
 
     /**
@@ -451,13 +461,12 @@ public abstract class SharedClusterIntegrationTestBase
      * @param consistency the consistency level to use for querying the data
      * @return all the data queried from the table
      */
-    protected ResultSet queryAllDataWithDriver(QualifiedName table,
-                                               
com.datastax.driver.core.ConsistencyLevel consistency)
+    protected ResultSet queryAllDataWithDriver(QualifiedName table, 
ConsistencyLevel consistency)
     {
         Cluster driverCluster = createDriverCluster(cluster.delegate());
         Session session = driverCluster.connect();
         SimpleStatement statement = new SimpleStatement(String.format("SELECT 
* FROM %s;", table));
-        statement.setConsistencyLevel(consistency);
+        
statement.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(consistency.name()));
         return session.execute(statement);
     }
 
@@ -486,20 +495,23 @@ public abstract class SharedClusterIntegrationTestBase
     static class IntegrationTestModule extends AbstractModule
     {
         private static final Logger LOGGER = 
LoggerFactory.getLogger(IntegrationTestModule.class);
-        private final ICluster<? extends IInstance> cluster;
+        private final Iterable<? extends IInstance> instances;
         private final IsolatedDTestClassLoaderWrapper wrapper;
         private final MtlsTestHelper mtlsTestHelper;
-        private final Map<String, String> additioanlOptions;
+        private final DnsResolver dnsResolver;
+        private final Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides;
 
-        IntegrationTestModule(ICluster<? extends IInstance> cluster,
+        IntegrationTestModule(Iterable<? extends IInstance> instances,
                               IsolatedDTestClassLoaderWrapper wrapper,
                               MtlsTestHelper mtlsTestHelper,
-                              Map<String, String> additionalOptions)
+                              DnsResolver dnsResolver,
+                              Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides)
         {
-            this.cluster = cluster;
+            this.instances = instances;
             this.wrapper = wrapper;
             this.mtlsTestHelper = mtlsTestHelper;
-            this.additioanlOptions = additionalOptions;
+            this.configurationOverrides = configurationOverrides;
+            this.dnsResolver = dnsResolver;
         }
 
         @Provides
@@ -522,16 +534,16 @@ public abstract class SharedClusterIntegrationTestBase
         {
             JmxConfiguration jmxConfiguration = 
configuration.serviceConfiguration().jmxConfiguration();
             List<InstanceMetadata> instanceMetadataList =
-            IntStream.rangeClosed(1, cluster.size())
-                     .mapToObj(i -> buildInstanceMetadata(vertx,
-                                                          cluster.get(i),
-                                                          
cassandraVersionProvider,
-                                                          
sidecarVersionProvider.sidecarVersion(),
-                                                          jmxConfiguration,
-                                                          cqlSessionProvider,
-                                                          dnsResolver,
-                                                          wrapper))
-                     .collect(Collectors.toList());
+            StreamSupport.stream(instances.spliterator(), false)
+                         .map(instance -> buildInstanceMetadata(vertx,
+                                                                instance,
+                                                                
cassandraVersionProvider,
+                                                                
sidecarVersionProvider.sidecarVersion(),
+                                                                
jmxConfiguration,
+                                                                
cqlSessionProvider,
+                                                                dnsResolver,
+                                                                wrapper))
+                         .collect(Collectors.toList());
             return new InstancesConfigImpl(instanceMetadataList, dnsResolver);
         }
 
@@ -577,26 +589,30 @@ public abstract class SharedClusterIntegrationTestBase
                             "like mTLS enabled.", 
CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS);
             }
             S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig());
-            return SidecarConfigurationImpl.builder()
-                                           .serviceConfiguration(conf)
-                                           .sslConfiguration(sslConfiguration)
-                                           
.s3ClientConfiguration(s3ClientConfig)
-                                           .build();
+            SidecarConfigurationImpl.Builder builder = 
SidecarConfigurationImpl.builder()
+                                                                               
.serviceConfiguration(conf)
+                                                                               
.sslConfiguration(sslConfiguration)
+                                                                               
.s3ClientConfiguration(s3ClientConfig);
+            if (configurationOverrides != null)
+            {
+                builder = configurationOverrides.apply(builder);
+            }
+            return builder.build();
         }
 
         @Provides
         @Singleton
         public DnsResolver dnsResolver()
         {
-            return new LocalhostResolver();
+            return dnsResolver;
         }
 
         private List<InetSocketAddress> buildContactPoints()
         {
-            return cluster.stream()
-                          .map(instance -> new 
InetSocketAddress(instance.config().broadcastAddress().getAddress(),
-                                                                 
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
-                          .collect(Collectors.toList());
+            return StreamSupport.stream(instances.spliterator(), false)
+                                .map(instance -> new 
InetSocketAddress(instance.config().broadcastAddress().getAddress(),
+                                                                       
tryGetIntConfig(instance.config(), "native_transport_port", 9042)))
+                                .collect(Collectors.toList());
         }
 
         private S3ProxyConfiguration buildTestS3ProxyConfig()
@@ -624,7 +640,7 @@ public abstract class SharedClusterIntegrationTestBase
                 @Override
                 public URI endpointOverride()
                 {
-                    return 
URI.create(additioanlOptions.getOrDefault(SIDECAR_S3_ENDPOINT_OVERRIDE_OPT, 
"http://localhost:9090";));
+                    return URI.create("http://localhost:9090";);
                 }
             };
         }
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
index 915d289..cabad60 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java
@@ -61,12 +61,6 @@ public interface IClusterExtension<I extends IInstance> 
extends ICluster<I>
      */
     IInstanceConfig newInstanceConfig();
 
-    /**
-     * @param nodeNum the node number
-     * @return a newly created instances configuration for the node {@code 
nodeNum}
-     */
-    IInstanceConfig createInstanceConfig(int nodeNum);
-
     /**
      * @return a reference to the delegated {@link ICluster} instance
      */
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
index 82b5d9a..fbd5479 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
@@ -39,9 +39,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.jetbrains.annotations.NotNull;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
 import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceTest.java
new file mode 100644
index 0000000..6f6a079
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static 
org.apache.cassandra.analytics.ResiliencyTestBase.uniqueTestTableFullName;
+import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
+import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+
+/**
+ * Tests bulk writes in different scenarios when Cassandra instances are down
+ */
+class BulkWriteDownInstanceTest extends SharedClusterSparkIntegrationTestBase
+{
+    Set<IInstance> downInstances = new HashSet<>();
+
+    @ParameterizedTest(name = "{index} => instanceDownCount={0}, {1}")
+    @MethodSource("testInputs")
+    void testHandlingOfDownedCassandraInstances(int instanceDownCount, 
TestConsistencyLevel cl)
+    {
+        // progressively stop instances as needed for the test
+        stopCassandraInstancesForTest(instanceDownCount);
+
+        QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, 
cl.readCL, cl.writeCL);
+        SparkSession spark = getOrCreateSparkSession();
+        Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, 
ROW_COUNT);
+
+        DataFrameWriter<Row> dfWriter = bulkWriterDataFrameWriter(df, 
tableName).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name());
+        if (isFailureExpected(instanceDownCount, cl))
+        {
+            sparkTestUtils.assertExpectedBulkWriteFailure(cl.writeCL.name(), 
dfWriter);
+        }
+        else
+        {
+            dfWriter.save();
+            // Validate using CQL
+            sparkTestUtils.validateWrites(df.collectAsList(), 
queryAllData(tableName, cl.readCL));
+        }
+    }
+
+    static boolean isFailureExpected(int instanceDownCount, 
TestConsistencyLevel cl)
+    {
+        if (instanceDownCount == 2)
+        {
+            // for a 3 instance cluster, if 2 instances are down, we can only 
write at consistency level ONE
+            return cl.writeCL != ONE;
+        }
+        if (instanceDownCount == 1)
+        {
+            // when one instance is down, we expect failure at CL = ALL
+            return cl.writeCL == ALL;
+        }
+        // No failure is expected when all instances are up
+        return false;
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF3);
+        testConsistencyLevels().forEach(consistencyLevel -> {
+            QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, 
consistencyLevel.readCL, consistencyLevel.writeCL);
+            createTestTable(tableName, CREATE_TEST_TABLE_STATEMENT);
+        });
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration().nodesPerDc(3);
+    }
+
+    void stopCassandraInstancesForTest(int instanceDownCount)
+    {
+        while (instanceDownCount > downInstances.size())
+        {
+            for (IInstance instance : cluster)
+            {
+                if (downInstances.add(instance))
+                {
+                    cluster.stopUnchecked(instance);
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * @return cartesian product of the list of consistency levels and 
instance down
+     */
+    static Stream<Arguments> testInputs()
+    {
+        return IntStream.rangeClosed(0, 2)
+                        .mapToObj(instanceDownCount -> new 
AbstractMap.SimpleEntry<>(instanceDownCount, testConsistencyLevels()))
+                        .flatMap(pair -> pair.getValue().stream().map(cl -> 
Arguments.of(pair.getKey(), cl)));
+    }
+
+    static List<TestConsistencyLevel> testConsistencyLevels()
+    {
+        return Arrays.asList(TestConsistencyLevel.of(ONE, ALL),
+                             TestConsistencyLevel.of(LOCAL_QUORUM, 
LOCAL_QUORUM),
+                             TestConsistencyLevel.of(ONE, ONE));
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarTest.java
new file mode 100644
index 0000000..deb9f9b
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.server.Server;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import static 
org.apache.cassandra.analytics.BulkWriteDownInstanceTest.isFailureExpected;
+import static 
org.apache.cassandra.analytics.BulkWriteDownInstanceTest.testConsistencyLevels;
+import static 
org.apache.cassandra.analytics.ResiliencyTestBase.uniqueTestTableFullName;
+import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
+import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests bulk writes in different scenarios when Sidecar instances are down. 
In this test
+ * we have one Sidecar managing a single Cassandra instance
+ */
+class BulkWriteDownSidecarTest extends SharedClusterSparkIntegrationTestBase
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkWriteDownSidecarTest.class);
+    Set<Server> downSidecars = new HashSet<>();
+    List<Server> sidecarServerList = new ArrayList<>();
+
+    @ParameterizedTest(name = "{index} => instanceDownCount={0} {1}")
+    @MethodSource("testInputs")
+    void testHandlingOfDownedSidecarInstances(int instanceDownCount, 
TestConsistencyLevel cl) throws Exception
+    {
+        // progressively stop Sidecar instances as needed for the test
+        stopSidecarInstancesForTest(instanceDownCount);
+
+        QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, 
cl.readCL, cl.writeCL);
+        SparkSession spark = getOrCreateSparkSession();
+        Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, 
ROW_COUNT);
+
+        DataFrameWriter<Row> dfWriter = bulkWriterDataFrameWriter(df, 
tableName).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name());
+        if (isFailureExpected(instanceDownCount, cl))
+        {
+            sparkTestUtils.assertExpectedBulkWriteFailure(cl.writeCL.name(), 
dfWriter);
+        }
+        else
+        {
+            dfWriter.save();
+            // Validate using CQL
+            sparkTestUtils.validateWrites(df.collectAsList(), 
queryAllData(tableName, cl.readCL));
+        }
+    }
+
+    @Override
+    protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
+    {
+        for (IInstance instance : cluster)
+        {
+            LOGGER.info("Starting Sidecar instance for Cassandra instance {}",
+                        instance.config().num());
+            Server server = 
startSidecarWithInstances(Collections.singleton(instance));
+            sidecarServerList.add(server);
+        }
+
+        assertThat(sidecarServerList.size()).as("Each Cassandra Instance will 
be managed by a single Sidecar instance")
+                                            .isEqualTo(cluster.size());
+        // assign the server to the first instance
+        server = sidecarServerList.get(0);
+    }
+
+    @Override
+    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
+    {
+        return builder -> {
+            ServiceConfiguration conf;
+            if (sidecarServerList.isEmpty())
+            {
+                // As opposed to the base class, this binds the host to a 
specific interface (localhost)
+                conf = ServiceConfigurationImpl.builder()
+                                               .host("localhost")
+                                               .port(0) // let the test find 
an available port
+                                               .build();
+            }
+            else
+            {
+                // Use the same port number for all Sidecar instances that we 
bring up. We use the port
+                // bound for the first instance, but we bind it to a different 
interface (localhost2, localhost3)
+                conf = ServiceConfigurationImpl.builder()
+                                               .host("localhost" + 
(sidecarServerList.size() + 1))
+                                               
.port(sidecarServerList.get(0).actualPort())
+                                               .build();
+            }
+            builder.serviceConfiguration(conf);
+
+            return builder;
+        };
+    }
+
+    @Override
+    protected void stopSidecar() throws InterruptedException
+    {
+        for (Server server : sidecarServerList)
+        {
+            if (downSidecars.add(server))
+            {
+                CountDownLatch closeLatch = new CountDownLatch(1);
+                server.close().onSuccess(res -> closeLatch.countDown());
+                if (!closeLatch.await(60, TimeUnit.SECONDS))
+                {
+                    logger.error("Close event timed out.");
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF3);
+        testConsistencyLevels().forEach(consistencyLevel -> {
+            QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, 
consistencyLevel.readCL, consistencyLevel.writeCL);
+            createTestTable(tableName, CREATE_TEST_TABLE_STATEMENT);
+        });
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration().nodesPerDc(3);
+    }
+
+    void stopSidecarInstancesForTest(int instanceDownCount) throws Exception
+    {
+        assertThat(sidecarServerList).isNotEmpty();
+        while (instanceDownCount > downSidecars.size())
+        {
+            for (Server server : sidecarServerList)
+            {
+                if (downSidecars.add(server))
+                {
+                    
server.close().toCompletionStage().toCompletableFuture().get(30, 
TimeUnit.SECONDS);
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * @return cartesian product of the list of consistency levels and 
instance down
+     */
+    static Stream<Arguments> testInputs()
+    {
+        return IntStream.rangeClosed(0, 2)
+                        .mapToObj(instanceDownCount -> new 
AbstractMap.SimpleEntry<>(instanceDownCount, testConsistencyLevels()))
+                        .flatMap(pair -> pair.getValue().stream().map(cl -> 
Arguments.of(pair.getKey(), cl)));
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
index 742c5f6..8b3c9bc 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -35,7 +34,7 @@ import java.util.stream.IntStream;
 
 import com.google.common.collect.Range;
 
-import com.datastax.driver.core.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -241,29 +240,6 @@ public abstract class ResiliencyTestBase extends 
SharedClusterSparkIntegrationTe
         return new ClusterBuilderConfiguration();
     }
 
-    public static class TestConsistencyLevel
-    {
-        public final ConsistencyLevel readCL;
-        public final ConsistencyLevel writeCL;
-
-        private TestConsistencyLevel(ConsistencyLevel readCL, ConsistencyLevel 
writeCL)
-        {
-            this.readCL = Objects.requireNonNull(readCL, "readCL is required");
-            this.writeCL = Objects.requireNonNull(writeCL, "writeCL is 
required");
-        }
-
-        public static TestConsistencyLevel of(ConsistencyLevel readCL, 
ConsistencyLevel writeCL)
-        {
-            return new TestConsistencyLevel(readCL, writeCL);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "readCL=" + readCL + ", writeCL=" + writeCL;
-        }
-    }
-
     /**
      * An interface that pulls a method from the Cassandra Storage Service 
Proxy
      */
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 246e15e..b5f62a4 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -29,6 +29,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.shared.JMXUtil;
@@ -47,6 +49,7 @@ import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
 
 /**
  * Helper methods for Spark tests
@@ -154,9 +157,9 @@ public class SparkTestUtils
                               // the quoted identifiers tests where we test 
mixed case
                               .set("spark.sql.caseSensitive", "True")
                               .set("spark.master", "local[8,4]")
-                              
.set("spark.cassandra_analytics.sidecar.request.retries", "5")
-                              
.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", 
"500")
-                              
.set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds",
 "500");
+                              
.set("spark.cassandra_analytics.sidecar.request.retries", "4")
+                              
.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", 
"250")
+                              
.set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds",
 "250");
         BulkSparkConf.setupSparkConf(sparkConf, true);
         KryoRegister.setup(sparkConf);
         return sparkConf;
@@ -190,6 +193,26 @@ public class SparkTestUtils
                                  .isEmpty();
     }
 
+    public void assertExpectedBulkWriteFailure(String writeCL, 
DataFrameWriter<Row> dfWriter)
+    {
+        Throwable thrown = catchThrowable(dfWriter::save);
+
+        assertThat(thrown).isInstanceOf(RuntimeException.class)
+                          .hasMessageContaining("java.lang.RuntimeException: 
Bulk Write to Cassandra has failed");
+
+        Throwable cause = thrown;
+
+        // Find the cause
+        while (cause != null && !StringUtils.contains(cause.getMessage(), 
"Failed to load"))
+        {
+            cause = cause.getCause();
+        }
+
+        assertThat(cause).isNotNull()
+                         .hasMessageFindingMatch("Failed to load (\\d+) ranges 
with " + writeCL +
+                                                 " for job ([a-zA-Z0-9-]+) in 
phase .*");
+    }
+
     /**
      * @return a comma-separated string with a list of all the hosts in the 
in-jvm dtest cluster
      */
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TestConsistencyLevel.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TestConsistencyLevel.java
new file mode 100644
index 0000000..f80ea16
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TestConsistencyLevel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.util.Objects;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+/**
+ * Encompasses reads and writes consistency levels for testing
+ */
+public class TestConsistencyLevel
+{
+    public final ConsistencyLevel readCL;
+    public final ConsistencyLevel writeCL;
+
+    private TestConsistencyLevel(ConsistencyLevel readCL, ConsistencyLevel 
writeCL)
+    {
+        this.readCL = Objects.requireNonNull(readCL, "readCL is required");
+        this.writeCL = Objects.requireNonNull(writeCL, "writeCL is required");
+    }
+
+    public static TestConsistencyLevel of(ConsistencyLevel readCL, 
ConsistencyLevel writeCL)
+    {
+        return new TestConsistencyLevel(readCL, writeCL);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "readCL=" + readCL + ", writeCL=" + writeCL;
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java
index 875a163..8695203 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java
@@ -36,16 +36,17 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java
index 403dbb7..a74d8d8 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java
@@ -36,14 +36,15 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java
index 3a534c6..7d34581 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.Arguments;
 
 import org.apache.cassandra.analytics.DataGenerationUtils;
 import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.testing.utils.ClusterUtils;
 import org.apache.cassandra.distributed.api.Feature;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java
index 5b86a2a..5819f16 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java
@@ -36,16 +36,17 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java
index 8fbd319..9d58b51 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java
@@ -36,14 +36,15 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java
index 186717d..b663d6e 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java
@@ -30,13 +30,14 @@ import org.junit.jupiter.params.provider.Arguments;
 
 import org.apache.cassandra.analytics.DataGenerationUtils;
 import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
-import org.apache.cassandra.testing.utils.ClusterUtils;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.cassandra.testing.utils.ClusterUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java
index 554e648..a4f8771 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java
@@ -35,13 +35,14 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java
index 8422493..4567035 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Test;
 
 import net.bytebuddy.ByteBuddy;
@@ -40,15 +39,15 @@ import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 import org.apache.cassandra.testing.TestUtils;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Row;
 
-import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.catchThrowable;
 
 /**
  * Validate failed write operation when host replacement fails resulting in 
insufficient nodes. This is simulated by
@@ -62,25 +61,10 @@ class HostReplacementMultiDCInsufficientReplicasTest 
extends HostReplacementTest
     @Test
     void nodeReplacementFailureMultiDCInsufficientNodes()
     {
-        Throwable thrown = catchThrowable(() ->
-                                          bulkWriterDataFrameWriter(df, 
QUALIFIED_NAME)
-                                          
.option(WriterOptions.BULK_WRITER_CL.name(), EACH_QUORUM.name())
-                                          .save());
+        DataFrameWriter<Row> dfWriter = bulkWriterDataFrameWriter(df, 
QUALIFIED_NAME)
+                                      
.option(WriterOptions.BULK_WRITER_CL.name(), EACH_QUORUM.name());
 
-        assertThat(thrown).isInstanceOf(RuntimeException.class)
-                          .hasMessageContaining("java.lang.RuntimeException: 
Bulk Write to Cassandra has failed");
-
-        Throwable cause = thrown;
-
-        // Find the cause
-        while (cause != null && !StringUtils.contains(cause.getMessage(), 
"Failed to load"))
-        {
-            cause = cause.getCause();
-        }
-
-        assertThat(cause).isNotNull()
-                         .hasMessageFindingMatch("Failed to load (\\d+) ranges 
with EACH_QUORUM for " +
-                                                 "job ([a-zA-Z0-9-]+) in phase 
Environment Validation.");
+        sparkTestUtils.assertExpectedBulkWriteFailure(EACH_QUORUM.name(), 
dfWriter);
     }
 
     @Override
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
index 7d778d4..a69f6c8 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
@@ -36,17 +36,18 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static 
org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java
index 4d83121..cc5fa5e 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java
@@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index bd12bc1..b11a816 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.params.provider.Arguments;
 
 import org.apache.cassandra.analytics.DataGenerationUtils;
 import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -47,9 +48,9 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
 import static org.assertj.core.api.Assertions.assertThat;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java
index 8cbe1b8..399b277 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java
@@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java
index 63c43d6..b7caf35 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java
@@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java
index 1b39ee6..d4dbd28 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java
@@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java
index 91a6290..8570695 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java
@@ -35,7 +35,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
-import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstance;
@@ -55,7 +55,7 @@ class LeavingMultiDCHalveClusterTest extends LeavingTestBase
 
     @ParameterizedTest(name = "{index} => {0}")
     @MethodSource("multiDCTestInputs")
-    void testLeavingScenario(ResiliencyTestBase.TestConsistencyLevel cl)
+    void testLeavingScenario(TestConsistencyLevel cl)
     {
         QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, 
cl.readCL, cl.writeCL);
         bulkWriterDataFrameWriter(df, 
table).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name())
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java
index 6f039f3..4b09bd9 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java
@@ -33,7 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
-import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
@@ -50,7 +50,7 @@ class LeavingMultiDCTest extends LeavingTestBase
 {
     @ParameterizedTest(name = "{index} => {0}")
     @MethodSource("multiDCTestInputs")
-    void testLeavingScenario(ResiliencyTestBase.TestConsistencyLevel cl)
+    void testLeavingScenario(TestConsistencyLevel cl)
     {
         QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, 
cl.readCL, cl.writeCL);
         bulkWriterDataFrameWriter(df, 
table).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name())
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java
index 2921ea4..2b23cb9 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java
@@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java
index 3d47d32..625ae0e 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java
@@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java
index 08f1cf2..79240e7 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java
@@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java
index 950fef4..64d83d5 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java
@@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java
index 2130ef2..11a6f2b 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.analytics.DataGenerationUtils;
 import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestConsistencyLevel;
 import org.apache.cassandra.analytics.TestUninterruptibles;
 import org.apache.cassandra.testing.utils.ClusterUtils;
 import org.apache.cassandra.distributed.api.ICluster;
@@ -45,11 +46,11 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
-import static com.datastax.driver.core.ConsistencyLevel.ALL;
-import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
-import static com.datastax.driver.core.ConsistencyLevel.ONE;
-import static com.datastax.driver.core.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
+import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
 
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
index da15e0e..125ddae 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.analytics.testcontainer;
 
 import java.util.Map;
+import java.util.function.Function;
 
 import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
@@ -27,6 +28,11 @@ import org.junit.jupiter.api.Test;
 import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
 import org.apache.cassandra.analytics.DataGenerationUtils;
 import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase;
+import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
+import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.S3ProxyConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 import org.apache.spark.sql.Dataset;
@@ -77,9 +83,18 @@ class BulkWriteS3CompatModeSimpleTest extends 
SharedClusterSparkIntegrationTestB
     }
 
     @Override
-    protected Map<String, String> sidecarAdditionalOptions()
+    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
     {
-        return ImmutableMap.of(SIDECAR_S3_ENDPOINT_OVERRIDE_OPT, 
s3Mock.getHttpEndpoint());
+        return builder -> {
+            S3ClientConfiguration s3ClientConfig = new 
S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig());
+            builder.s3ClientConfiguration(s3ClientConfig);
+            return builder;
+        };
+    }
+
+    private S3ProxyConfiguration buildTestS3ProxyConfig()
+    {
+        return new S3MockProxyConfigurationImpl(s3Mock.getHttpEndpoint());
     }
 
     /**
@@ -98,4 +113,12 @@ class BulkWriteS3CompatModeSimpleTest extends 
SharedClusterSparkIntegrationTestB
         bulkWriterDataFrameWriter(df, TABLE_NAME, s3CompatOptions).save();
         sparkTestUtils.validateWrites(df.collectAsList(), 
queryAllData(TABLE_NAME));
     }
+
+    public static class S3MockProxyConfigurationImpl extends 
S3ProxyConfigurationImpl
+    {
+        S3MockProxyConfigurationImpl(String endpointOverride)
+        {
+            super(null, null, null, endpointOverride);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to