This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 090dd4fd CASSANALYTICS-20: CassandraDataLayer uses configuration list of IPs instead of the full ring/datacenter (#122) 090dd4fd is described below commit 090dd4fdc86ea0ca9410140ce72e840e37497df7 Author: Yifan Cai <y...@apache.org> AuthorDate: Sat Jun 21 20:45:26 2025 -0700 CASSANALYTICS-20: CassandraDataLayer uses configuration list of IPs instead of the full ring/datacenter (#122) Patch by Serban Teodorescu, Yifan Cai; Reviewed by Francisco Guerrero, Yifan Cai for CASSANALYTICS-20 --------- Co-authored-by: Serban Teodorescu <teodo...@adobe.com> --- CHANGES.txt | 1 + .../cassandra/spark/data/CassandraDataLayer.java | 31 +++++++++++++---- .../apache/cassandra/analytics/BulkReaderTest.java | 27 +++++++++++++++ .../SharedClusterSparkIntegrationTestBase.java | 21 ++++++++++-- .../apache/cassandra/analytics/SparkTestUtils.java | 39 +++++++++++++--------- 5 files changed, 94 insertions(+), 25 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1f074a1f..8fd4e7af 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Use full ring instead of only IPs from configuration (CASSANALYTICS-20) * Bulk Reader should dynamically size the Spark job based on estimated table size (CASSANALYTICS-36) * Allow getting cassandra role in Spark options for use in Sidecar requests for RBAC (CASSANALYTICS-61) * Fix NPE in the deserialized CassandraClusterInfoGroup (CASSANALYTICS-59) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 5e7910ae..bf877f61 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -146,7 +146,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV private SslConfig sslConfig; @VisibleForTesting - transient Map<String, SidecarInstance> instanceMap; + transient Map<String, SidecarInstance> sidecarInstanceMap; public CassandraDataLayer(@NotNull ClientConfig options, @NotNull Sidecar.ClientConfig sidecarClientConfig, @@ -220,6 +220,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV this.rfMap = rfMap; this.timeProvider = timeProvider; this.maybeQuoteKeyspaceAndTable(); + this.initSidecarClient(); this.initInstanceMap(); this.startupValidate(); } @@ -234,7 +235,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV // Load cluster config from options clusterConfig = initializeClusterConfig(options); - initInstanceMap(); + initSidecarClient(); // Get cluster info from Cassandra Sidecar int effectiveNumberOfCores; @@ -251,6 +252,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV { throw new RuntimeException(ThrowableUtils.rootCause(exception)); } + initInstanceMap(); LOGGER.info("Initialized Cassandra Bulk Reader with effectiveNumberOfCores={}", effectiveNumberOfCores); } @@ -422,7 +424,21 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected void initInstanceMap() { - instanceMap = clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname, Function.identity())); + Preconditions.checkState(tokenPartitioner != null, "tokenPartitioner cannot be absent"); + sidecarInstanceMap = tokenPartitioner + .ring() + .instances() + .stream() + .filter(instance -> datacenter == null || datacenter.equals(instance.dataCenter())) + .map(CassandraInstance::nodeName) + .distinct() + .map(nodeName -> new SidecarInstanceImpl(nodeName, sidecarClientConfig.effectivePort())) + .collect(Collectors.toMap(SidecarInstance::hostname, Function.identity())); + LOGGER.info("Initialized CassandraDataLayer sidecarInstanceMap numInstances={}", sidecarInstanceMap.size()); + } + + protected void initSidecarClient() + { try { SslConfigSecretsProvider secretsProvider = sslConfig != null @@ -436,7 +452,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV { throw new RuntimeException("Unable to build sidecar client", ioException); } - LOGGER.info("Initialized CassandraDataLayer instanceMap numInstances={}", instanceMap.size()); + LOGGER.info("Initialized sidecar client"); } @Override @@ -516,7 +532,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV @NotNull Range<BigInteger> range, @NotNull CassandraInstance instance) { - SidecarInstance sidecarInstance = instanceMap.get(instance.nodeName()); + SidecarInstance sidecarInstance = sidecarInstanceMap.get(instance.nodeName()); if (sidecarInstance == null) { throw new IllegalStateException("Could not find matching cassandra instance: " + instance.nodeName()); @@ -733,6 +749,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV this.rfMap = (Map<String, ReplicationFactor>) in.readObject(); this.timeProvider = new ReaderTimeProvider(in.readInt()); this.maybeQuoteKeyspaceAndTable(); + this.initSidecarClient(); this.initInstanceMap(); this.startupValidate(); } @@ -944,10 +961,10 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV LOGGER.info("Clearing snapshot at end of Spark job snapshotName={} keyspace={} table={} dc={}", snapshotName, maybeQuotedKeyspace, maybeQuotedTable, datacenter); - CountDownLatch latch = new CountDownLatch(clusterConfig.size()); + CountDownLatch latch = new CountDownLatch(sidecarInstanceMap.size()); try { - for (SidecarInstance instance : clusterConfig) + for (SidecarInstance instance : sidecarInstanceMap.values()) { sidecar.clearSnapshot(instance, maybeQuotedKeyspace, maybeQuotedTable, snapshotName).whenComplete((resp, throwable) -> { try diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java index 21637e85..7c5ebaff 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.analytics; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -48,6 +50,13 @@ class BulkReaderTest extends SharedClusterSparkIntegrationTestBase QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE); QualifiedName tableForNullStaticColumn = uniqueTestTableFullName(TEST_KEYSPACE); + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(2); + } + @Test void testDynamicSizingOption() { @@ -119,6 +128,24 @@ class BulkReaderTest extends SharedClusterSparkIntegrationTestBase } } + @Test + void testUsingSingleSidecarContactPoint() + { + String singleSidecar = SparkTestUtils.sidecarInstancesOptionStream(cluster, dnsResolver) + .limit(1) + .collect(Collectors.joining()); + assertThat(cluster.size()).isEqualTo(2); + assertThat(singleSidecar.contains(",")) + .describedAs("should not contain the separator ',' as it should have one single contact point") + .isFalse(); + Dataset<Row> data = bulkReaderDataFrame(table1, Collections.singletonMap("sidecar_contact_points", singleSidecar)).load(); + + List<Row> rows = data.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(0))) + .collect(Collectors.toList()); + assertThat(rows.size()).isEqualTo(DATASET.size()); + } + @Override protected void initializeSchemaForTest() { diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java index 9a7d5889..c6a98ca7 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java @@ -91,8 +91,25 @@ public abstract class SharedClusterSparkIntegrationTestBase extends SharedCluste */ protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName) { - return sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(), getOrCreateSparkSession(), - tableName); + return sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(), + getOrCreateSparkSession(), + tableName, Collections.emptyMap()); + } + + /** + * A preconfigured {@link DataFrameReader} with pre-populated required options that can be overridden + * with additional options for every specific test. + * + * @param tableName the qualified name for the Cassandra table + * @param additionalOptions additional options for the data frame + * @return a {@link DataFrameReader} for Cassandra bulk reads + */ + protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName, Map<String, String> additionalOptions) + { + return sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(), + getOrCreateSparkSession(), + tableName, + additionalOptions); } /** diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java index 0e3a1366..5eda84a9 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java @@ -21,6 +21,7 @@ package org.apache.cassandra.analytics; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -112,7 +113,8 @@ public class SparkTestUtils */ public DataFrameReader defaultBulkReaderDataFrame(SparkConf sparkConf, SparkSession spark, - QualifiedName tableName) + QualifiedName tableName, + Map<String, String> additionalOptions) { SQLContext sql = spark.sqlContext(); SparkContext sc = spark.sparkContext(); @@ -121,22 +123,27 @@ public class SparkTestUtils int numExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors", sparkConf.getInt("spark.executor.instances", 1)); int numCores = coresPerExecutor * numExecutors; + Map<String, String> options = new HashMap<>(); + options.put("sidecar_contact_points", sidecarInstancesOption(cluster, dnsResolver)); + options.put("keyspace", tableName.keyspace()); + options.put("table", tableName.table()); + options.put("DC", "datacenter1"); + options.put("snapshotName", UUID.randomUUID().toString()); + options.put("createSnapshot", "true"); + // Shutdown hooks are called after the job ends, and in the case of integration tests + // the sidecar is already shut down before this. Since the cluster will be torn + // down anyway, the integration job skips clearing snapshots. + options.put("clearSnapshotStrategy", "noop"); + options.put("defaultParallelism", String.valueOf(sc.defaultParallelism())); + options.put("numCores", String.valueOf(numCores)); + options.put("sizing", "default"); + options.put("sidecar_port", String.valueOf(sidecarPort)); + // merge in additionalOptions; note that for options with the same name, the entries in additionalOptions are kept + options.putAll(additionalOptions); + return sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource") - .option("sidecar_contact_points", sidecarInstancesOption(cluster, dnsResolver)) - .option("keyspace", tableName.keyspace()) // unquoted - .option("table", tableName.table()) // unquoted - .option("DC", "datacenter1") - .option("snapshotName", UUID.randomUUID().toString()) - .option("createSnapshot", "true") - // Shutdown hooks are called after the job ends, and in the case of integration tests - // the sidecar is already shut down before this. Since the cluster will be torn - // down anyway, the integration job skips clearing snapshots. - .option("clearSnapshotStrategy", "noop") - .option("defaultParallelism", sc.defaultParallelism()) - .option("numCores", numCores) - .option("sizing", "default") - .options(mtlsTestHelper.mtlOptionMap()) - .option("sidecar_port", sidecarPort); + .options(options) + .options(mtlsTestHelper.mtlOptionMap()); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org