This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 45de9e0 CASSANDRA-19634: Improve test coverage for downed instances (#59) 45de9e0 is described below commit 45de9e08e7dc177f1f3273456b62af7ee0f5dbdb Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Sun May 12 11:35:14 2024 -0700 CASSANDRA-19634: Improve test coverage for downed instances (#59) Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19634 --- .../distributed/impl/CassandraCluster.java | 6 - .../testing/SharedClusterIntegrationTestBase.java | 114 +++++++----- .../cassandra/testing/IClusterExtension.java | 6 - .../cassandra/analytics/BlockedInstancesTest.java | 6 +- .../analytics/BulkWriteDownInstanceTest.java | 145 +++++++++++++++ .../analytics/BulkWriteDownSidecarTest.java | 199 +++++++++++++++++++++ .../cassandra/analytics/ResiliencyTestBase.java | 26 +-- .../apache/cassandra/analytics/SparkTestUtils.java | 29 ++- .../cassandra/analytics/TestConsistencyLevel.java | 49 +++++ .../analytics/expansion/JoiningMultiDCTest.java | 11 +- .../analytics/expansion/JoiningSingleNodeTest.java | 7 +- .../analytics/expansion/JoiningTestBase.java | 1 + .../movement/NodeMovementMultiDCTest.java | 11 +- .../analytics/movement/NodeMovementTest.java | 7 +- .../analytics/movement/NodeMovementTestBase.java | 3 +- .../HostReplacementMultiDCFailureTest.java | 3 +- ...ReplacementMultiDCInsufficientReplicasTest.java | 28 +-- .../replacement/HostReplacementMultiDCTest.java | 11 +- .../analytics/replacement/HostReplacementTest.java | 1 + .../replacement/HostReplacementTestBase.java | 7 +- .../analytics/shrink/LeavingHalfTest.java | 1 + .../shrink/LeavingMultiDCFailureTest.java | 1 + .../LeavingMultiDCHalveClusterFailureTest.java | 1 + .../shrink/LeavingMultiDCHalveClusterTest.java | 4 +- .../analytics/shrink/LeavingMultiDCTest.java | 4 +- .../shrink/LeavingMultipleFailureTest.java | 1 + .../analytics/shrink/LeavingMultipleTest.java | 1 + .../analytics/shrink/LeavingSingleFailureTest.java | 1 + .../analytics/shrink/LeavingSingleTest.java | 1 + .../analytics/shrink/LeavingTestBase.java | 11 +- .../BulkWriteS3CompatModeSimpleTest.java | 27 ++- 31 files changed, 572 insertions(+), 151 deletions(-) diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java index 795c758..a339be2 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java @@ -152,12 +152,6 @@ public class CassandraCluster<I extends IInstance> implements IClusterExtension< return delegate.newInstanceConfig(); } - @Override - public IInstanceConfig createInstanceConfig(int nodeNum) - { - return delegate.createInstanceConfig(nodeNum); - } - @Override public ICluster<I> delegate() { diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index 67864a4..a67591d 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -27,15 +27,15 @@ import java.net.UnknownHostException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; +import java.util.stream.StreamSupport; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -142,19 +142,15 @@ import static org.assertj.core.api.Assertions.assertThat; @ExtendWith(VertxExtension.class) public abstract class SharedClusterIntegrationTestBase { - public static final String SIDECAR_S3_ENDPOINT_OVERRIDE_OPT = "S3_ENDPOINT_OVERRIDE"; - protected final Logger logger = LoggerFactory.getLogger(SharedClusterIntegrationTestBase.class); private static final int MAX_CLUSTER_PROVISION_RETRIES = 5; @TempDir static Path secretsPath; - protected Vertx vertx; - protected DnsResolver dnsResolver; + protected DnsResolver dnsResolver = new LocalhostResolver(); protected IClusterExtension<? extends IInstance> cluster; protected Server server; - protected Injector injector; protected TestVersion testVersion; protected MtlsTestHelper mtlsTestHelper; private IsolatedDTestClassLoaderWrapper classLoaderWrapper; @@ -321,9 +317,9 @@ public abstract class SharedClusterIntegrationTestBase /** * Override to provide additional options to configure sidecar */ - protected Map<String, String> sidecarAdditionalOptions() + protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides() { - return Collections.emptyMap(); + return null; } /** @@ -333,18 +329,30 @@ public abstract class SharedClusterIntegrationTestBase * @throws InterruptedException when the startup times out */ protected void startSidecar(ICluster<? extends IInstance> cluster) throws InterruptedException + { + server = startSidecarWithInstances(cluster); + } + + /** + * Starts Sidecar configured to run with the provided {@link IInstance}s from the cluster. + * + * @param instances the Cassandra instances Sidecar will manage + * @return the started server + * @throws InterruptedException when the server start operation is interrupted + */ + protected Server startSidecarWithInstances(Iterable<? extends IInstance> instances) throws InterruptedException { VertxTestContext context = new VertxTestContext(); - AbstractModule testModule = new IntegrationTestModule(cluster, classLoaderWrapper, mtlsTestHelper, sidecarAdditionalOptions()); - injector = Guice.createInjector(Modules.override(new MainModule()).with(testModule)); - dnsResolver = injector.getInstance(DnsResolver.class); - vertx = injector.getInstance(Vertx.class); - server = injector.getInstance(Server.class); - server.start() - .onSuccess(s -> context.completeNow()) - .onFailure(context::failNow); + AbstractModule testModule = new IntegrationTestModule(instances, classLoaderWrapper, mtlsTestHelper, + dnsResolver, configurationOverrides()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(testModule)); + Server sidecarServer = injector.getInstance(Server.class); + sidecarServer.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); context.awaitCompletion(5, TimeUnit.SECONDS); + return sidecarServer; } /** @@ -430,7 +438,9 @@ public abstract class SharedClusterIntegrationTestBase */ protected Object[][] queryAllData(QualifiedName table, ConsistencyLevel consistencyLevel) { - return cluster.coordinator(1).execute(String.format("SELECT * FROM %s;", table), consistencyLevel); + return cluster.getFirstRunningInstance() + .coordinator() + .execute(String.format("SELECT * FROM %s;", table), consistencyLevel); } /** @@ -441,7 +451,7 @@ public abstract class SharedClusterIntegrationTestBase */ protected ResultSet queryAllDataWithDriver(QualifiedName table) { - return queryAllDataWithDriver(table, com.datastax.driver.core.ConsistencyLevel.ALL); + return queryAllDataWithDriver(table, ConsistencyLevel.ALL); } /** @@ -451,13 +461,12 @@ public abstract class SharedClusterIntegrationTestBase * @param consistency the consistency level to use for querying the data * @return all the data queried from the table */ - protected ResultSet queryAllDataWithDriver(QualifiedName table, - com.datastax.driver.core.ConsistencyLevel consistency) + protected ResultSet queryAllDataWithDriver(QualifiedName table, ConsistencyLevel consistency) { Cluster driverCluster = createDriverCluster(cluster.delegate()); Session session = driverCluster.connect(); SimpleStatement statement = new SimpleStatement(String.format("SELECT * FROM %s;", table)); - statement.setConsistencyLevel(consistency); + statement.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(consistency.name())); return session.execute(statement); } @@ -486,20 +495,23 @@ public abstract class SharedClusterIntegrationTestBase static class IntegrationTestModule extends AbstractModule { private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTestModule.class); - private final ICluster<? extends IInstance> cluster; + private final Iterable<? extends IInstance> instances; private final IsolatedDTestClassLoaderWrapper wrapper; private final MtlsTestHelper mtlsTestHelper; - private final Map<String, String> additioanlOptions; + private final DnsResolver dnsResolver; + private final Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides; - IntegrationTestModule(ICluster<? extends IInstance> cluster, + IntegrationTestModule(Iterable<? extends IInstance> instances, IsolatedDTestClassLoaderWrapper wrapper, MtlsTestHelper mtlsTestHelper, - Map<String, String> additionalOptions) + DnsResolver dnsResolver, + Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides) { - this.cluster = cluster; + this.instances = instances; this.wrapper = wrapper; this.mtlsTestHelper = mtlsTestHelper; - this.additioanlOptions = additionalOptions; + this.configurationOverrides = configurationOverrides; + this.dnsResolver = dnsResolver; } @Provides @@ -522,16 +534,16 @@ public abstract class SharedClusterIntegrationTestBase { JmxConfiguration jmxConfiguration = configuration.serviceConfiguration().jmxConfiguration(); List<InstanceMetadata> instanceMetadataList = - IntStream.rangeClosed(1, cluster.size()) - .mapToObj(i -> buildInstanceMetadata(vertx, - cluster.get(i), - cassandraVersionProvider, - sidecarVersionProvider.sidecarVersion(), - jmxConfiguration, - cqlSessionProvider, - dnsResolver, - wrapper)) - .collect(Collectors.toList()); + StreamSupport.stream(instances.spliterator(), false) + .map(instance -> buildInstanceMetadata(vertx, + instance, + cassandraVersionProvider, + sidecarVersionProvider.sidecarVersion(), + jmxConfiguration, + cqlSessionProvider, + dnsResolver, + wrapper)) + .collect(Collectors.toList()); return new InstancesConfigImpl(instanceMetadataList, dnsResolver); } @@ -577,26 +589,30 @@ public abstract class SharedClusterIntegrationTestBase "like mTLS enabled.", CASSANDRA_INTEGRATION_TEST_ENABLE_MTLS); } S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig()); - return SidecarConfigurationImpl.builder() - .serviceConfiguration(conf) - .sslConfiguration(sslConfiguration) - .s3ClientConfiguration(s3ClientConfig) - .build(); + SidecarConfigurationImpl.Builder builder = SidecarConfigurationImpl.builder() + .serviceConfiguration(conf) + .sslConfiguration(sslConfiguration) + .s3ClientConfiguration(s3ClientConfig); + if (configurationOverrides != null) + { + builder = configurationOverrides.apply(builder); + } + return builder.build(); } @Provides @Singleton public DnsResolver dnsResolver() { - return new LocalhostResolver(); + return dnsResolver; } private List<InetSocketAddress> buildContactPoints() { - return cluster.stream() - .map(instance -> new InetSocketAddress(instance.config().broadcastAddress().getAddress(), - tryGetIntConfig(instance.config(), "native_transport_port", 9042))) - .collect(Collectors.toList()); + return StreamSupport.stream(instances.spliterator(), false) + .map(instance -> new InetSocketAddress(instance.config().broadcastAddress().getAddress(), + tryGetIntConfig(instance.config(), "native_transport_port", 9042))) + .collect(Collectors.toList()); } private S3ProxyConfiguration buildTestS3ProxyConfig() @@ -624,7 +640,7 @@ public abstract class SharedClusterIntegrationTestBase @Override public URI endpointOverride() { - return URI.create(additioanlOptions.getOrDefault(SIDECAR_S3_ENDPOINT_OVERRIDE_OPT, "http://localhost:9090")); + return URI.create("http://localhost:9090"); } }; } diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java index 915d289..cabad60 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/IClusterExtension.java @@ -61,12 +61,6 @@ public interface IClusterExtension<I extends IInstance> extends ICluster<I> */ IInstanceConfig newInstanceConfig(); - /** - * @param nodeNum the node number - * @return a newly created instances configuration for the node {@code nodeNum} - */ - IInstanceConfig createInstanceConfig(int nodeNum); - /** * @return a reference to the delegated {@link ICluster} instance */ diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java index 82b5d9a..fbd5479 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java @@ -39,9 +39,9 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.jetbrains.annotations.NotNull; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; import static org.apache.cassandra.testing.TestUtils.DC1_RF3; import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceTest.java new file mode 100644 index 0000000..6f6a079 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.analytics.ResiliencyTestBase.uniqueTestTableFullName; +import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; + +/** + * Tests bulk writes in different scenarios when Cassandra instances are down + */ +class BulkWriteDownInstanceTest extends SharedClusterSparkIntegrationTestBase +{ + Set<IInstance> downInstances = new HashSet<>(); + + @ParameterizedTest(name = "{index} => instanceDownCount={0}, {1}") + @MethodSource("testInputs") + void testHandlingOfDownedCassandraInstances(int instanceDownCount, TestConsistencyLevel cl) + { + // progressively stop instances as needed for the test + stopCassandraInstancesForTest(instanceDownCount); + + QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, cl.readCL, cl.writeCL); + SparkSession spark = getOrCreateSparkSession(); + Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + + DataFrameWriter<Row> dfWriter = bulkWriterDataFrameWriter(df, tableName).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name()); + if (isFailureExpected(instanceDownCount, cl)) + { + sparkTestUtils.assertExpectedBulkWriteFailure(cl.writeCL.name(), dfWriter); + } + else + { + dfWriter.save(); + // Validate using CQL + sparkTestUtils.validateWrites(df.collectAsList(), queryAllData(tableName, cl.readCL)); + } + } + + static boolean isFailureExpected(int instanceDownCount, TestConsistencyLevel cl) + { + if (instanceDownCount == 2) + { + // for a 3 instance cluster, if 2 instances are down, we can only write at consistency level ONE + return cl.writeCL != ONE; + } + if (instanceDownCount == 1) + { + // when one instance is down, we expect failure at CL = ALL + return cl.writeCL == ALL; + } + // No failure is expected when all instances are up + return false; + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3); + testConsistencyLevels().forEach(consistencyLevel -> { + QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, consistencyLevel.readCL, consistencyLevel.writeCL); + createTestTable(tableName, CREATE_TEST_TABLE_STATEMENT); + }); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration().nodesPerDc(3); + } + + void stopCassandraInstancesForTest(int instanceDownCount) + { + while (instanceDownCount > downInstances.size()) + { + for (IInstance instance : cluster) + { + if (downInstances.add(instance)) + { + cluster.stopUnchecked(instance); + break; + } + } + } + } + + /** + * @return cartesian product of the list of consistency levels and instance down + */ + static Stream<Arguments> testInputs() + { + return IntStream.rangeClosed(0, 2) + .mapToObj(instanceDownCount -> new AbstractMap.SimpleEntry<>(instanceDownCount, testConsistencyLevels())) + .flatMap(pair -> pair.getValue().stream().map(cl -> Arguments.of(pair.getKey(), cl))); + } + + static List<TestConsistencyLevel> testConsistencyLevels() + { + return Arrays.asList(TestConsistencyLevel.of(ONE, ALL), + TestConsistencyLevel.of(LOCAL_QUORUM, LOCAL_QUORUM), + TestConsistencyLevel.of(ONE, ONE)); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarTest.java new file mode 100644 index 0000000..deb9f9b --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; +import org.apache.cassandra.sidecar.server.Server; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import static org.apache.cassandra.analytics.BulkWriteDownInstanceTest.isFailureExpected; +import static org.apache.cassandra.analytics.BulkWriteDownInstanceTest.testConsistencyLevels; +import static org.apache.cassandra.analytics.ResiliencyTestBase.uniqueTestTableFullName; +import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests bulk writes in different scenarios when Sidecar instances are down. In this test + * we have one Sidecar managing a single Cassandra instance + */ +class BulkWriteDownSidecarTest extends SharedClusterSparkIntegrationTestBase +{ + private static final Logger LOGGER = LoggerFactory.getLogger(BulkWriteDownSidecarTest.class); + Set<Server> downSidecars = new HashSet<>(); + List<Server> sidecarServerList = new ArrayList<>(); + + @ParameterizedTest(name = "{index} => instanceDownCount={0} {1}") + @MethodSource("testInputs") + void testHandlingOfDownedSidecarInstances(int instanceDownCount, TestConsistencyLevel cl) throws Exception + { + // progressively stop Sidecar instances as needed for the test + stopSidecarInstancesForTest(instanceDownCount); + + QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, cl.readCL, cl.writeCL); + SparkSession spark = getOrCreateSparkSession(); + Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, ROW_COUNT); + + DataFrameWriter<Row> dfWriter = bulkWriterDataFrameWriter(df, tableName).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name()); + if (isFailureExpected(instanceDownCount, cl)) + { + sparkTestUtils.assertExpectedBulkWriteFailure(cl.writeCL.name(), dfWriter); + } + else + { + dfWriter.save(); + // Validate using CQL + sparkTestUtils.validateWrites(df.collectAsList(), queryAllData(tableName, cl.readCL)); + } + } + + @Override + protected void startSidecar(ICluster<? extends IInstance> cluster) throws InterruptedException + { + for (IInstance instance : cluster) + { + LOGGER.info("Starting Sidecar instance for Cassandra instance {}", + instance.config().num()); + Server server = startSidecarWithInstances(Collections.singleton(instance)); + sidecarServerList.add(server); + } + + assertThat(sidecarServerList.size()).as("Each Cassandra Instance will be managed by a single Sidecar instance") + .isEqualTo(cluster.size()); + // assign the server to the first instance + server = sidecarServerList.get(0); + } + + @Override + protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides() + { + return builder -> { + ServiceConfiguration conf; + if (sidecarServerList.isEmpty()) + { + // As opposed to the base class, this binds the host to a specific interface (localhost) + conf = ServiceConfigurationImpl.builder() + .host("localhost") + .port(0) // let the test find an available port + .build(); + } + else + { + // Use the same port number for all Sidecar instances that we bring up. We use the port + // bound for the first instance, but we bind it to a different interface (localhost2, localhost3) + conf = ServiceConfigurationImpl.builder() + .host("localhost" + (sidecarServerList.size() + 1)) + .port(sidecarServerList.get(0).actualPort()) + .build(); + } + builder.serviceConfiguration(conf); + + return builder; + }; + } + + @Override + protected void stopSidecar() throws InterruptedException + { + for (Server server : sidecarServerList) + { + if (downSidecars.add(server)) + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (!closeLatch.await(60, TimeUnit.SECONDS)) + { + logger.error("Close event timed out."); + } + } + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF3); + testConsistencyLevels().forEach(consistencyLevel -> { + QualifiedName tableName = uniqueTestTableFullName(TEST_KEYSPACE, consistencyLevel.readCL, consistencyLevel.writeCL); + createTestTable(tableName, CREATE_TEST_TABLE_STATEMENT); + }); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration().nodesPerDc(3); + } + + void stopSidecarInstancesForTest(int instanceDownCount) throws Exception + { + assertThat(sidecarServerList).isNotEmpty(); + while (instanceDownCount > downSidecars.size()) + { + for (Server server : sidecarServerList) + { + if (downSidecars.add(server)) + { + server.close().toCompletionStage().toCompletableFuture().get(30, TimeUnit.SECONDS); + break; + } + } + } + } + + /** + * @return cartesian product of the list of consistency levels and instance down + */ + static Stream<Arguments> testInputs() + { + return IntStream.rangeClosed(0, 2) + .mapToObj(instanceDownCount -> new AbstractMap.SimpleEntry<>(instanceDownCount, testConsistencyLevels())) + .flatMap(pair -> pair.getValue().stream().map(cl -> Arguments.of(pair.getKey(), cl))); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java index 742c5f6..8b3c9bc 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -35,7 +34,7 @@ import java.util.stream.IntStream; import com.google.common.collect.Range; -import com.datastax.driver.core.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -241,29 +240,6 @@ public abstract class ResiliencyTestBase extends SharedClusterSparkIntegrationTe return new ClusterBuilderConfiguration(); } - public static class TestConsistencyLevel - { - public final ConsistencyLevel readCL; - public final ConsistencyLevel writeCL; - - private TestConsistencyLevel(ConsistencyLevel readCL, ConsistencyLevel writeCL) - { - this.readCL = Objects.requireNonNull(readCL, "readCL is required"); - this.writeCL = Objects.requireNonNull(writeCL, "writeCL is required"); - } - - public static TestConsistencyLevel of(ConsistencyLevel readCL, ConsistencyLevel writeCL) - { - return new TestConsistencyLevel(readCL, writeCL); - } - - @Override - public String toString() - { - return "readCL=" + readCL + ", writeCL=" + writeCL; - } - } - /** * An interface that pulls a method from the Cassandra Storage Service Proxy */ diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java index 246e15e..b5f62a4 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java @@ -29,6 +29,8 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.lang3.StringUtils; + import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.shared.JMXUtil; @@ -47,6 +49,7 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; /** * Helper methods for Spark tests @@ -154,9 +157,9 @@ public class SparkTestUtils // the quoted identifiers tests where we test mixed case .set("spark.sql.caseSensitive", "True") .set("spark.master", "local[8,4]") - .set("spark.cassandra_analytics.sidecar.request.retries", "5") - .set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", "500") - .set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds", "500"); + .set("spark.cassandra_analytics.sidecar.request.retries", "4") + .set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", "250") + .set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds", "250"); BulkSparkConf.setupSparkConf(sparkConf, true); KryoRegister.setup(sparkConf); return sparkConf; @@ -190,6 +193,26 @@ public class SparkTestUtils .isEmpty(); } + public void assertExpectedBulkWriteFailure(String writeCL, DataFrameWriter<Row> dfWriter) + { + Throwable thrown = catchThrowable(dfWriter::save); + + assertThat(thrown).isInstanceOf(RuntimeException.class) + .hasMessageContaining("java.lang.RuntimeException: Bulk Write to Cassandra has failed"); + + Throwable cause = thrown; + + // Find the cause + while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load")) + { + cause = cause.getCause(); + } + + assertThat(cause).isNotNull() + .hasMessageFindingMatch("Failed to load (\\d+) ranges with " + writeCL + + " for job ([a-zA-Z0-9-]+) in phase .*"); + } + /** * @return a comma-separated string with a list of all the hosts in the in-jvm dtest cluster */ diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TestConsistencyLevel.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TestConsistencyLevel.java new file mode 100644 index 0000000..f80ea16 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TestConsistencyLevel.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Objects; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; + +/** + * Encompasses reads and writes consistency levels for testing + */ +public class TestConsistencyLevel +{ + public final ConsistencyLevel readCL; + public final ConsistencyLevel writeCL; + + private TestConsistencyLevel(ConsistencyLevel readCL, ConsistencyLevel writeCL) + { + this.readCL = Objects.requireNonNull(readCL, "readCL is required"); + this.writeCL = Objects.requireNonNull(writeCL, "writeCL is required"); + } + + public static TestConsistencyLevel of(ConsistencyLevel readCL, ConsistencyLevel writeCL) + { + return new TestConsistencyLevel(readCL, writeCL); + } + + @Override + public String toString() + { + return "readCL=" + readCL + ", writeCL=" + writeCL; + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java index 875a163..8695203 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningMultiDCTest.java @@ -36,16 +36,17 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java index 403dbb7..a74d8d8 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningSingleNodeTest.java @@ -36,14 +36,15 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java index 3a534c6..7d34581 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningTestBase.java @@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.apache.cassandra.analytics.DataGenerationUtils; import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.testing.utils.ClusterUtils; import org.apache.cassandra.distributed.api.Feature; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java index 5b86a2a..5819f16 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementMultiDCTest.java @@ -36,16 +36,17 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java index 8fbd319..9d58b51 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTest.java @@ -36,14 +36,15 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; import static org.apache.cassandra.testing.TestUtils.DC1_RF3; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java index 186717d..b663d6e 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/movement/NodeMovementTestBase.java @@ -30,13 +30,14 @@ import org.junit.jupiter.params.provider.Arguments; import org.apache.cassandra.analytics.DataGenerationUtils; import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; -import org.apache.cassandra.testing.utils.ClusterUtils; import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.cassandra.testing.utils.ClusterUtils; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java index 554e648..a4f8771 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCFailureTest.java @@ -35,13 +35,14 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java index 8422493..4567035 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCInsufficientReplicasTest.java @@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Test; import net.bytebuddy.ByteBuddy; @@ -40,15 +39,15 @@ import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.testing.ClusterBuilderConfiguration; import org.apache.cassandra.testing.TestUtils; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Row; -import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3; import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.catchThrowable; /** * Validate failed write operation when host replacement fails resulting in insufficient nodes. This is simulated by @@ -62,25 +61,10 @@ class HostReplacementMultiDCInsufficientReplicasTest extends HostReplacementTest @Test void nodeReplacementFailureMultiDCInsufficientNodes() { - Throwable thrown = catchThrowable(() -> - bulkWriterDataFrameWriter(df, QUALIFIED_NAME) - .option(WriterOptions.BULK_WRITER_CL.name(), EACH_QUORUM.name()) - .save()); + DataFrameWriter<Row> dfWriter = bulkWriterDataFrameWriter(df, QUALIFIED_NAME) + .option(WriterOptions.BULK_WRITER_CL.name(), EACH_QUORUM.name()); - assertThat(thrown).isInstanceOf(RuntimeException.class) - .hasMessageContaining("java.lang.RuntimeException: Bulk Write to Cassandra has failed"); - - Throwable cause = thrown; - - // Find the cause - while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load")) - { - cause = cause.getCause(); - } - - assertThat(cause).isNotNull() - .hasMessageFindingMatch("Failed to load (\\d+) ranges with EACH_QUORUM for " + - "job ([a-zA-Z0-9-]+) in phase Environment Validation."); + sparkTestUtils.assertExpectedBulkWriteFailure(EACH_QUORUM.name(), dfWriter); } @Override diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java index 7d778d4..a69f6c8 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java @@ -36,17 +36,18 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.cassandra.testing.ClusterBuilderConfiguration; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java index 4d83121..cc5fa5e 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTest.java @@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java index bd12bc1..b11a816 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java @@ -36,6 +36,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.apache.cassandra.analytics.DataGenerationUtils; import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -47,9 +48,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; import static org.assertj.core.api.Assertions.assertThat; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java index 8cbe1b8..399b277 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingHalfTest.java @@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java index 63c43d6..b7caf35 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCFailureTest.java @@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.spark.bulkwriter.WriterOptions; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java index 1b39ee6..d4dbd28 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterFailureTest.java @@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java index 91a6290..8570695 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCHalveClusterTest.java @@ -35,7 +35,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; -import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstance; @@ -55,7 +55,7 @@ class LeavingMultiDCHalveClusterTest extends LeavingTestBase @ParameterizedTest(name = "{index} => {0}") @MethodSource("multiDCTestInputs") - void testLeavingScenario(ResiliencyTestBase.TestConsistencyLevel cl) + void testLeavingScenario(TestConsistencyLevel cl) { QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, cl.readCL, cl.writeCL); bulkWriterDataFrameWriter(df, table).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name()) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java index 6f039f3..4b09bd9 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultiDCTest.java @@ -33,7 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; -import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; @@ -50,7 +50,7 @@ class LeavingMultiDCTest extends LeavingTestBase { @ParameterizedTest(name = "{index} => {0}") @MethodSource("multiDCTestInputs") - void testLeavingScenario(ResiliencyTestBase.TestConsistencyLevel cl) + void testLeavingScenario(TestConsistencyLevel cl) { QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, cl.readCL, cl.writeCL); bulkWriterDataFrameWriter(df, table).option(WriterOptions.BULK_WRITER_CL.name(), cl.writeCL.name()) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java index 2921ea4..2b23cb9 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleFailureTest.java @@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java index 3d47d32..625ae0e 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingMultipleTest.java @@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java index 08f1cf2..79240e7 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleFailureTest.java @@ -34,6 +34,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java index 950fef4..64d83d5 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingSingleTest.java @@ -33,6 +33,7 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.sidecar.testing.QualifiedName; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java index 2130ef2..11a6f2b 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/shrink/LeavingTestBase.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.analytics.DataGenerationUtils; import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestConsistencyLevel; import org.apache.cassandra.analytics.TestUninterruptibles; import org.apache.cassandra.testing.utils.ClusterUtils; import org.apache.cassandra.distributed.api.ICluster; @@ -45,11 +46,11 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import static com.datastax.driver.core.ConsistencyLevel.ALL; -import static com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM; -import static com.datastax.driver.core.ConsistencyLevel.ONE; -import static com.datastax.driver.core.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; +import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM; import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java index da15e0e..125ddae 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.analytics.testcontainer; import java.util.Map; +import java.util.function.Function; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; @@ -27,6 +28,11 @@ import org.junit.jupiter.api.Test; import com.adobe.testing.s3mock.testcontainers.S3MockContainer; import org.apache.cassandra.analytics.DataGenerationUtils; import org.apache.cassandra.analytics.SharedClusterSparkIntegrationTestBase; +import org.apache.cassandra.sidecar.config.S3ClientConfiguration; +import org.apache.cassandra.sidecar.config.S3ProxyConfiguration; +import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.S3ProxyConfigurationImpl; +import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.testing.ClusterBuilderConfiguration; import org.apache.spark.sql.Dataset; @@ -77,9 +83,18 @@ class BulkWriteS3CompatModeSimpleTest extends SharedClusterSparkIntegrationTestB } @Override - protected Map<String, String> sidecarAdditionalOptions() + protected Function<SidecarConfigurationImpl.Builder, SidecarConfigurationImpl.Builder> configurationOverrides() { - return ImmutableMap.of(SIDECAR_S3_ENDPOINT_OVERRIDE_OPT, s3Mock.getHttpEndpoint()); + return builder -> { + S3ClientConfiguration s3ClientConfig = new S3ClientConfigurationImpl("s3-client", 4, 60L, buildTestS3ProxyConfig()); + builder.s3ClientConfiguration(s3ClientConfig); + return builder; + }; + } + + private S3ProxyConfiguration buildTestS3ProxyConfig() + { + return new S3MockProxyConfigurationImpl(s3Mock.getHttpEndpoint()); } /** @@ -98,4 +113,12 @@ class BulkWriteS3CompatModeSimpleTest extends SharedClusterSparkIntegrationTestB bulkWriterDataFrameWriter(df, TABLE_NAME, s3CompatOptions).save(); sparkTestUtils.validateWrites(df.collectAsList(), queryAllData(TABLE_NAME)); } + + public static class S3MockProxyConfigurationImpl extends S3ProxyConfigurationImpl + { + S3MockProxyConfigurationImpl(String endpointOverride) + { + super(null, null, null, endpointOverride); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org