This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new f960685e CASSANALYTICS-89: Create dedicated data class for broadcast
variable during bulk write (#146)
f960685e is described below
commit f960685e0ee9fafc0e4819ff40a77179f0f7b859
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Oct 28 16:59:56 2025 -0700
CASSANALYTICS-89: Create dedicated data class for broadcast variable during
bulk write (#146)
Extract BulkWriterConfig as an immutable, serializable data class for
Spark broadcast variables, replacing direct broadcasting of
BulkWriterContext.
This addresses the issue where BulkWriterContext, containing mutable state
and non-serializable dependencies, was being broadcast to executors
Follows Spark best practices for broadcast variables (immutable data only)
and
provides a clear separation between configuration data and stateful context
objects
Patch by Yifan Cai; reviewed by Francisco Guerrero for CASSANALYTICS-89
---
CHANGES.txt | 1 +
.../bulkwriter/AbstractBulkWriterContext.java | 102 +++++++++-
.../spark/bulkwriter/BroadcastableClusterInfo.java | 104 +++++++++++
.../bulkwriter/BroadcastableClusterInfoGroup.java | 150 +++++++++++++++
.../spark/bulkwriter/BroadcastableJobInfo.java | 110 +++++++++++
.../spark/bulkwriter/BroadcastableSchemaInfo.java | 83 ++++++++
.../spark/bulkwriter/BroadcastableTableSchema.java | 208 +++++++++++++++++++++
.../bulkwriter/BroadcastableTokenPartitioner.java | 93 +++++++++
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 37 ++--
.../spark/bulkwriter/BulkWriterConfig.java | 114 +++++++++++
.../spark/bulkwriter/BulkWriterContext.java | 37 +++-
.../spark/bulkwriter/BulkWriterContextFactory.java | 2 +-
.../bulkwriter/CassandraBulkSourceRelation.java | 92 +++++++--
.../bulkwriter/CassandraBulkWriterContext.java | 19 +-
.../spark/bulkwriter/CassandraClusterInfo.java | 51 ++++-
.../spark/bulkwriter/CassandraJobInfo.java | 16 +-
.../spark/bulkwriter/CassandraSchemaInfo.java | 13 +-
.../cassandra/spark/bulkwriter/ClusterInfo.java | 16 +-
.../bulkwriter/IBroadcastableClusterInfo.java | 83 ++++++++
.../apache/cassandra/spark/bulkwriter/JobInfo.java | 10 +-
.../cassandra/spark/bulkwriter/RecordWriter.java | 11 +-
.../cassandra/spark/bulkwriter/SchemaInfo.java | 10 +-
.../cassandra/spark/bulkwriter/TableSchema.java | 73 ++++----
.../spark/bulkwriter/TokenPartitioner.java | 172 +++++++++++++----
.../cassandra/spark/bulkwriter/Tokenizer.java | 14 +-
.../CloudStorageDataTransferApiFactory.java | 12 +-
.../cloudstorage/StorageClientConfig.java | 7 +-
.../coordinated/CassandraClusterInfoGroup.java | 74 +++++++-
.../CassandraCoordinatedBulkWriterContext.java | 25 ++-
.../coordinated/CoordinatedWriteConf.java | 1 -
.../MultiClusterReplicaAwareFailureHandler.java | 9 +-
.../spark/bulkwriter/util/SbwKryoRegistrator.java | 26 ++-
.../spark/bulkwriter/BulkSparkConfTest.java | 2 +-
.../spark/bulkwriter/CassandraClusterInfoTest.java | 2 +-
.../spark/bulkwriter/RecordWriterTest.java | 4 +-
.../spark/bulkwriter/TableSchemaNormalizeTest.java | 3 +-
.../spark/bulkwriter/TableSchemaTest.java | 34 ++--
.../coordinated/CassandraClusterInfoGroupTest.java | 61 ++++--
.../CoordinatedImportCoordinatorTest.java | 10 +-
.../apache/cassandra/spark/reader/ReaderUtils.java | 3 +-
.../apache/cassandra/spark/reader/ReaderUtils.java | 3 +-
41 files changed, 1690 insertions(+), 207 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7ce881e4..04956e51 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Refactor BulkWriterContext broadcasting to use immutable config class
(CASSANALYTICS-89)
* Bump sidecar dependency to 0.2.0 (CASSANALYTICS-93)
* Support for Trie-Indexed SSTables (BTI format) (CASSANALYTICS-27)
* Add extractCdcTables method to CqlUtils (CASSANALYTICS-91)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index 87e5a979..6acbb2fc 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -32,6 +32,7 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
@@ -44,9 +45,29 @@ import org.apache.cassandra.spark.utils.CqlUtils;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
+/**
+ * Abstract base class for BulkWriterContext implementations.
+ *
+ * <p>Serialization Architecture:</p>
+ * <p>This class is NOT serialized directly. Instead:
+ * <ol>
+ * <li>Driver creates BulkWriterContext using constructor</li>
+ * <li>Driver extracts BulkWriterConfig in {@link
CassandraBulkSourceRelation} constructor</li>
+ * <li>BulkWriterConfig gets broadcast to executors</li>
+ * <li>Executors reconstruct BulkWriterContext via {@link
BulkWriterContext#from(BulkWriterConfig)}</li>
+ * </ol>
+ *
+ * <p>Broadcastable wrappers used in BulkWriterConfig:
+ * <ul>
+ * <li>{@link IBroadcastableClusterInfo} → reconstructs to {@link
CassandraClusterInfo} or {@link CassandraClusterInfoGroup}</li>
+ * <li>{@link BroadcastableJobInfo} → reconstructs to {@link
CassandraJobInfo}</li>
+ * <li>{@link BroadcastableSchemaInfo} → reconstructs to {@link
CassandraSchemaInfo}</li>
+ * </ul>
+ *
+ * <p>Implements KryoSerializable with fail-fast approach to detect missing
Kryo registration.
+ */
public abstract class AbstractBulkWriterContext implements BulkWriterContext,
KryoSerializable
{
- private static final long serialVersionUID = -6526396615116954510L;
// log as the concrete implementation; but use private to not expose the
logger to implementations
private final transient Logger logger =
LoggerFactory.getLogger(this.getClass());
@@ -62,13 +83,22 @@ public abstract class AbstractBulkWriterContext implements
BulkWriterContext, Kr
private transient volatile JobStatsPublisher jobStatsPublisher;
private transient volatile TransportContext transportContext;
+ /**
+ * Constructor for driver usage.
+ * Builds all components fresh on the driver.
+ *
+ * @param conf Bulk Spark configuration
+ * @param structType DataFrame schema
+ * @param sparkDefaultParallelism Spark default parallelism
+ */
protected AbstractBulkWriterContext(@NotNull BulkSparkConf conf,
@NotNull StructType structType,
@NotNull int sparkDefaultParallelism)
{
this.conf = conf;
this.sparkDefaultParallelism = sparkDefaultParallelism;
- // Note: build sequence matters
+
+ // Build everything fresh on driver
this.clusterInfo = buildClusterInfo();
this.clusterInfo.startupValidate();
this.lowestCassandraVersion = findLowestCassandraVersion();
@@ -76,10 +106,32 @@ public abstract class AbstractBulkWriterContext implements
BulkWriterContext, Kr
this.jobInfo = buildJobInfo();
this.schemaInfo = buildSchemaInfo(structType);
this.jobStatsPublisher = buildJobStatsPublisher();
- this.transportContext = buildTransportContext(true);
+ this.transportContext = buildTransportContext(true); // isOnDriver =
true
+ }
+
+ /**
+ * Constructor for executor usage.
+ * Reconstructs components from broadcast configuration on executors.
+ * This is used by the factory method {@link
BulkWriterContext#from(BulkWriterConfig)}.
+ *
+ * @param config immutable configuration for the bulk writer with
pre-computed values
+ */
+ protected AbstractBulkWriterContext(@NotNull BulkWriterConfig config)
+ {
+ this.conf = config.getConf();
+ this.sparkDefaultParallelism = config.getSparkDefaultParallelism();
+
+ // Reconstruct from broadcast data on executor
+ this.clusterInfo =
reconstructClusterInfoOnExecutor(config.getBroadcastableClusterInfo());
+ this.lowestCassandraVersion = config.getLowestCassandraVersion();
+ this.bridge = buildCassandraBridge();
+ this.jobInfo =
reconstructJobInfoOnExecutor(config.getBroadcastableJobInfo());
+ this.schemaInfo =
reconstructSchemaInfoOnExecutor(config.getBroadcastableSchemaInfo());
+ this.jobStatsPublisher = buildJobStatsPublisher();
+ this.transportContext = buildTransportContext(false); // isOnDriver =
false
}
- protected final BulkSparkConf bulkSparkConf()
+ public final BulkSparkConf bulkSparkConf()
{
return conf;
}
@@ -98,6 +150,48 @@ public abstract class AbstractBulkWriterContext implements
BulkWriterContext, Kr
protected abstract ClusterInfo buildClusterInfo();
+ /**
+ * Reconstructs ClusterInfo on executors from broadcastable versions.
+ * This method is only called on executors when reconstructing
BulkWriterContext from
+ * broadcast BulkWriterConfig. Each broadcastable type knows how to
reconstruct itself
+ * into the appropriate full ClusterInfo implementation.
+ *
+ * @param clusterInfo the BroadcastableClusterInfo from broadcast
+ * @return reconstructed ClusterInfo (CassandraClusterInfo or
CassandraClusterInfoGroup)
+ */
+ protected ClusterInfo
reconstructClusterInfoOnExecutor(IBroadcastableClusterInfo clusterInfo)
+ {
+ return clusterInfo.reconstruct();
+ }
+
+ /**
+ * Reconstructs JobInfo on executors from BroadcastableJobInfo.
+ * This method is only called on executors when reconstructing
BulkWriterContext from
+ * broadcast BulkWriterConfig. It rebuilds CassandraJobInfo with
TokenPartitioner reconstructed
+ * from the broadcastable partition mappings.
+ *
+ * @param jobInfo the BroadcastableJobInfo from broadcast
+ * @return reconstructed CassandraJobInfo
+ */
+ protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo
jobInfo)
+ {
+ return new CassandraJobInfo(jobInfo);
+ }
+
+ /**
+ * Reconstructs SchemaInfo on executors from BroadcastableSchemaInfo.
+ * This method is only called on executors when reconstructing
BulkWriterContext from
+ * broadcast BulkWriterConfig. It reconstructs CassandraSchemaInfo and
TableSchema from
+ * the broadcast data (no Sidecar calls needed).
+ *
+ * @param schemaInfo the BroadcastableSchemaInfo from broadcast
+ * @return reconstructed CassandraSchemaInfo
+ */
+ protected SchemaInfo
reconstructSchemaInfoOnExecutor(BroadcastableSchemaInfo schemaInfo)
+ {
+ return new CassandraSchemaInfo(schemaInfo);
+ }
+
protected abstract void validateKeyspaceReplication();
protected JobInfo buildJobInfo()
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfo.java
new file mode 100644
index 00000000..de06151e
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Broadcastable wrapper for single cluster with ZERO transient fields to
optimize Spark broadcasting.
+ * <p>
+ * Only essential fields are broadcast; executors reconstruct
CassandraClusterInfo to fetch other data from Sidecar.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ * <li>Minimize SizeEstimator reflection overhead during broadcast
preparation</li>
+ * <li>Reduce broadcast variable serialization size</li>
+ * <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableClusterInfo implements
IBroadcastableClusterInfo
+{
+ private static final long serialVersionUID = 4506917240637924802L;
+
+ // Essential fields broadcast to executors
+ private final Partitioner partitioner;
+ private final String cassandraVersion;
+ private final String clusterId;
+ private final BulkSparkConf conf;
+
+ /**
+ * Creates a BroadcastableCluster from a CassandraClusterInfo by
extracting essential fields.
+ * Executors will reconstruct CassandraClusterInfo to fetch other data
from Sidecar.
+ *
+ * @param source the source ClusterInfo (typically CassandraClusterInfo)
+ * @param conf the BulkSparkConf needed to connect to Sidecar on
executors
+ */
+ public static BroadcastableClusterInfo from(@NotNull ClusterInfo source,
@NotNull BulkSparkConf conf)
+ {
+ return new BroadcastableClusterInfo(source.getPartitioner(),
source.getLowestCassandraVersion(), source.clusterId(), conf);
+ }
+
+ private BroadcastableClusterInfo(@NotNull Partitioner partitioner,
+ @NotNull String cassandraVersion,
+ @Nullable String clusterId,
+ @NotNull BulkSparkConf conf)
+ {
+ this.partitioner = partitioner;
+ this.cassandraVersion = cassandraVersion;
+ this.clusterId = clusterId;
+ this.conf = conf;
+ }
+
+ public BulkSparkConf getConf()
+ {
+ return conf;
+ }
+
+ @Override
+ public String getLowestCassandraVersion()
+ {
+ return cassandraVersion;
+ }
+
+ @Override
+ public Partitioner getPartitioner()
+ {
+ return partitioner;
+ }
+
+ @Override
+ @Nullable
+ public String clusterId()
+ {
+ return clusterId;
+ }
+
+ @Override
+ public ClusterInfo reconstruct()
+ {
+ return new CassandraClusterInfo(this);
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfoGroup.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfoGroup.java
new file mode 100644
index 00000000..d5ad1e13
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfoGroup.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterSupport;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Broadcastable wrapper for coordinated writes with ZERO transient fields to
optimize Spark broadcasting.
+ * <p>
+ * This class wraps multiple BroadcastableCluster instances for multi-cluster
scenarios.
+ * Pre-computed values (partitioner, lowestCassandraVersion) are extracted
from CassandraClusterInfoGroup on the driver
+ * to avoid duplicating aggregation/validation logic on executors.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ * <li>Minimize SizeEstimator reflection overhead during broadcast
preparation</li>
+ * <li>Reduce broadcast variable serialization size</li>
+ * <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableClusterInfoGroup implements
IBroadcastableClusterInfo, MultiClusterSupport<IBroadcastableClusterInfo>
+{
+ private static final long serialVersionUID = 8621255452042082506L;
+
+ private final List<BroadcastableClusterInfo> clusterInfos;
+ private final String clusterId;
+ private final BulkSparkConf conf;
+ private final Partitioner partitioner;
+ private final String lowestCassandraVersion;
+
+ /**
+ * Creates a BroadcastableClusterInfoGroup from a source ClusterInfo group.
+ * Extracts pre-computed values (partitioner, lowestCassandraVersion) from
the source
+ * to avoid duplicating aggregation/validation logic on executors.
+ *
+ * @param source the source CassandraClusterInfoGroup
+ * @param conf the BulkSparkConf needed to connect to Sidecar on
executors
+ */
+ public static BroadcastableClusterInfoGroup from(@NotNull
CassandraClusterInfoGroup source,
+ @NotNull BulkSparkConf
conf)
+ {
+ List<BroadcastableClusterInfo> broadcastableInfos = new ArrayList<>();
+ source.forEach((clusterId, clusterInfo) ->
broadcastableInfos.add(BroadcastableClusterInfo.from(clusterInfo, conf)));
+
+ // Extract pre-computed values from CassandraClusterInfoGroup
+ // These have already been validated/computed on the driver
+ Partitioner partitioner = source.getPartitioner();
+ String lowestVersion = source.getLowestCassandraVersion();
+
+ return new BroadcastableClusterInfoGroup(broadcastableInfos,
source.clusterId(), conf, partitioner, lowestVersion);
+ }
+
+ private BroadcastableClusterInfoGroup(List<BroadcastableClusterInfo>
clusterInfos,
+ String clusterId,
+ BulkSparkConf conf,
+ Partitioner partitioner,
+ String lowestCassandraVersion)
+ {
+ this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+ this.conf = conf;
+ this.clusterId = clusterId;
+ this.partitioner = partitioner;
+ this.lowestCassandraVersion = lowestCassandraVersion;
+ }
+
+ @Override
+ @NotNull
+ public BulkSparkConf getConf()
+ {
+ return conf;
+ }
+
+ @Override
+ public String getLowestCassandraVersion()
+ {
+ // Return pre-computed value from CassandraClusterInfoGroup
+ // No need to duplicate aggregation/validation logic
+ return lowestCassandraVersion;
+ }
+
+ @Override
+ public Partitioner getPartitioner()
+ {
+ // Return pre-computed value from CassandraClusterInfoGroup
+ // No need to duplicate validation logic
+ return partitioner;
+ }
+
+ @Override
+ public String clusterId()
+ {
+ return clusterId;
+ }
+
+ // MultiClusterSupport methods
+ @Override
+ public int size()
+ {
+ return clusterInfos.size();
+ }
+
+ @Override
+ public void forEach(BiConsumer<String, IBroadcastableClusterInfo> action)
+ {
+ clusterInfos.forEach(info -> action.accept(info.clusterId(), info));
+ }
+
+ @Nullable
+ @Override
+ public IBroadcastableClusterInfo getValueOrNull(@NotNull String clusterId)
+ {
+ throw new UnsupportedOperationException("getValueOrNull should not be
called from BroadcastableClusterInfoGroup");
+ }
+
+ @Override
+ public ClusterInfo reconstruct()
+ {
+ return CassandraClusterInfoGroup.from(this);
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableJobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableJobInfo.java
new file mode 100644
index 00000000..b879d1b9
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableJobInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.UUID;
+
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.Serializable;
+
+/**
+ * Broadcastable wrapper for job information with ZERO transient fields to
optimize Spark broadcasting.
+ * <p>
+ * Only essential fields are broadcast; executors reconstruct CassandraJobInfo
to rebuild TokenPartitioner.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ * <li>Minimize SizeEstimator reflection overhead during broadcast
preparation</li>
+ * <li>Reduce broadcast variable serialization size</li>
+ * <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableJobInfo implements Serializable
+{
+ private static final long serialVersionUID = -8717074052066841748L;
+
+ // Essential fields broadcast to executors
+ private final BulkSparkConf conf;
+ private final MultiClusterContainer<UUID> restoreJobIds;
+ private final BroadcastableTokenPartitioner tokenPartitioner; //
Broadcastable version without Logger
+
+ /**
+ * Creates a BroadcastableJobInfo from a source JobInfo.
+ * Extracts partition mappings from TokenPartitioner to avoid broadcasting
Logger.
+ *
+ * @param source the source JobInfo (typically CassandraJobInfo)
+ * @param conf the BulkSparkConf needed for executors
+ */
+ public static BroadcastableJobInfo from(@NotNull JobInfo source, @NotNull
BulkSparkConf conf)
+ {
+ // Extract restoreJobIds from source
+ MultiClusterContainer<UUID> restoreJobIds;
+ if (source.isCoordinatedWriteEnabled())
+ {
+ // For coordinated write, need to extract all restoreJobIds
+ CoordinatedWriteConf coordinatedConf =
source.coordinatedWriteConf();
+ restoreJobIds = new MultiClusterContainer<>();
+ coordinatedConf.clusters().keySet().forEach(clusterId -> {
+ restoreJobIds.setValue(clusterId,
source.getRestoreJobId(clusterId));
+ });
+ }
+ else
+ {
+ // Single cluster - use null key
+ restoreJobIds =
MultiClusterContainer.ofSingle(source.getRestoreJobId());
+ }
+
+ // Extract partition mappings from TokenPartitioner
+ BroadcastableTokenPartitioner broadcastableTokenPartitioner =
BroadcastableTokenPartitioner.from(source.getTokenPartitioner());
+
+ return new BroadcastableJobInfo(conf, restoreJobIds,
broadcastableTokenPartitioner);
+ }
+
+ private BroadcastableJobInfo(BulkSparkConf conf,
+ MultiClusterContainer<UUID> restoreJobIds,
+ BroadcastableTokenPartitioner tokenPartitioner)
+ {
+ this.conf = conf;
+ this.restoreJobIds = restoreJobIds;
+ this.tokenPartitioner = tokenPartitioner;
+ }
+
+ public BulkSparkConf getConf()
+ {
+ return conf;
+ }
+
+ public MultiClusterContainer<UUID> getRestoreJobIds()
+ {
+ return restoreJobIds;
+ }
+
+ public BroadcastableTokenPartitioner getBroadcastableTokenPartitioner()
+ {
+ return tokenPartitioner;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableSchemaInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableSchemaInfo.java
new file mode 100644
index 00000000..9c0a164b
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableSchemaInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Broadcastable wrapper for schema information with ZERO transient fields to
optimize Spark broadcasting.
+ * <p>
+ * Contains BroadcastableTableSchema (pre-computed schema data) and UDT
statements.
+ * Executors reconstruct CassandraSchemaInfo and TableSchema from these fields.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ * <li>Minimize SizeEstimator reflection overhead during broadcast
preparation</li>
+ * <li>Reduce broadcast variable serialization size</li>
+ * <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableSchemaInfo implements Serializable
+{
+ private static final long serialVersionUID = -8727074052066841748L;
+
+ // Essential fields broadcast to executors
+ private final BroadcastableTableSchema broadcastableTableSchema;
+ private final Set<String> userDefinedTypeStatements;
+
+ /**
+ * Creates a BroadcastableSchemaInfo from a source SchemaInfo.
+ * Extracts BroadcastableTableSchema to avoid serializing Logger.
+ *
+ * @param source the source SchemaInfo (typically CassandraSchemaInfo)
+ */
+ public static BroadcastableSchemaInfo from(@NotNull SchemaInfo source)
+ {
+ return new BroadcastableSchemaInfo(
+ BroadcastableTableSchema.from(source.getTableSchema()),
+ source.getUserDefinedTypeStatements()
+ );
+ }
+
+ private BroadcastableSchemaInfo(BroadcastableTableSchema
broadcastableTableSchema,
+ Set<String> userDefinedTypeStatements)
+ {
+ this.broadcastableTableSchema = broadcastableTableSchema;
+ this.userDefinedTypeStatements = userDefinedTypeStatements;
+ }
+
+ public BroadcastableTableSchema getBroadcastableTableSchema()
+ {
+ return broadcastableTableSchema;
+ }
+
+ @NotNull
+ public Set<String> getUserDefinedTypeStatements()
+ {
+ return userDefinedTypeStatements;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java
new file mode 100644
index 00000000..322e815d
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Broadcastable wrapper for TableSchema with ZERO transient fields to
optimize Spark broadcasting.
+ * <p>
+ * Contains all essential fields from TableSchema needed on executors, but
without the Logger reference.
+ * Executors will reconstruct TableSchema from these fields.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ * <li>Minimize SizeEstimator reflection overhead during broadcast
preparation</li>
+ * <li>Reduce broadcast variable serialization size</li>
+ * <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableTableSchema implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+
+ // All fields from TableSchema needed for reconstruction on executors
+ private final String createStatement;
+ private final String modificationStatement;
+ private final List<String> partitionKeyColumns;
+ private final List<ColumnType<?>> partitionKeyColumnTypes;
+ private final List<SqlToCqlTypeConverter.Converter<?>> converters;
+ private final List<Integer> keyFieldPositions;
+ private final WriteMode writeMode;
+ private final TTLOption ttlOption;
+ private final TimestampOption timestampOption;
+ private final String lowestCassandraVersion;
+ private final boolean quoteIdentifiers;
+
+ /**
+ * Creates a BroadcastableTableSchema from a source TableSchema.
+ * Extracts all essential fields but excludes the Logger.
+ *
+ * @param source the source TableSchema (driver-only)
+ * @return broadcastable version without Logger
+ */
+ public static BroadcastableTableSchema from(@NotNull TableSchema source)
+ {
+ return new BroadcastableTableSchema(
+ source.createStatement,
+ source.modificationStatement,
+ source.partitionKeyColumns,
+ source.partitionKeyColumnTypes,
+ source.converters,
+ source.keyFieldPositions,
+ source.writeMode,
+ source.ttlOption,
+ source.timestampOption,
+ source.lowestCassandraVersion,
+ source.quoteIdentifiers
+ );
+ }
+
+ private BroadcastableTableSchema(String createStatement,
+ String modificationStatement,
+ List<String> partitionKeyColumns,
+ List<ColumnType<?>>
partitionKeyColumnTypes,
+ List<SqlToCqlTypeConverter.Converter<?>>
converters,
+ List<Integer> keyFieldPositions,
+ WriteMode writeMode,
+ TTLOption ttlOption,
+ TimestampOption timestampOption,
+ String lowestCassandraVersion,
+ boolean quoteIdentifiers)
+ {
+ this.createStatement = createStatement;
+ this.modificationStatement = modificationStatement;
+ this.partitionKeyColumns = partitionKeyColumns;
+ this.partitionKeyColumnTypes = partitionKeyColumnTypes;
+ this.converters = converters;
+ this.keyFieldPositions = keyFieldPositions;
+ this.writeMode = writeMode;
+ this.ttlOption = ttlOption;
+ this.timestampOption = timestampOption;
+ this.lowestCassandraVersion = lowestCassandraVersion;
+ this.quoteIdentifiers = quoteIdentifiers;
+ }
+
+ public String getCreateStatement()
+ {
+ return createStatement;
+ }
+
+ public String getModificationStatement()
+ {
+ return modificationStatement;
+ }
+
+ public List<String> getPartitionKeyColumns()
+ {
+ return partitionKeyColumns;
+ }
+
+ public List<ColumnType<?>> getPartitionKeyColumnTypes()
+ {
+ return partitionKeyColumnTypes;
+ }
+
+ public List<SqlToCqlTypeConverter.Converter<?>> getConverters()
+ {
+ return converters;
+ }
+
+ public List<Integer> getKeyFieldPositions()
+ {
+ return keyFieldPositions;
+ }
+
+ public WriteMode getWriteMode()
+ {
+ return writeMode;
+ }
+
+ public TTLOption getTtlOption()
+ {
+ return ttlOption;
+ }
+
+ public TimestampOption getTimestampOption()
+ {
+ return timestampOption;
+ }
+
+ public String getLowestCassandraVersion()
+ {
+ return lowestCassandraVersion;
+ }
+
+ public boolean isQuoteIdentifiers()
+ {
+ return quoteIdentifiers;
+ }
+
+ /**
+ * Normalizes a row by applying type converters to each field.
+ * This mirrors the normalize method in TableSchema but uses the
broadcast-safe converters list.
+ *
+ * @param row the row data to normalize
+ * @return the normalized row (same array instance, mutated in place)
+ */
+ public Object[] normalize(Object[] row)
+ {
+ for (int index = 0; index < row.length; index++)
+ {
+ row[index] = converters.get(index).convert(row[index]);
+ }
+ return row;
+ }
+
+ /**
+ * Extracts key columns from all columns based on key field positions.
+ * This mirrors the getKeyColumns method in TableSchema but uses the
broadcast-safe keyFieldPositions list.
+ *
+ * @param allColumns all columns in the row
+ * @return array containing only the key columns
+ */
+ public Object[] getKeyColumns(Object[] allColumns)
+ {
+ return getKeyColumns(allColumns, keyFieldPositions);
+ }
+
+ @NotNull
+ public static Object[] getKeyColumns(Object[] allColumns, List<Integer>
keyFieldPositions)
+ {
+ Object[] result = new Object[keyFieldPositions.size()];
+ for (int keyFieldPosition = 0; keyFieldPosition <
keyFieldPositions.size(); keyFieldPosition++)
+ {
+ Object colVal =
allColumns[keyFieldPositions.get(keyFieldPosition)];
+ Preconditions.checkNotNull(colVal, "Found a null primary or
composite key column in source data. All key columns must be non-null.");
+ result[keyFieldPosition] = colVal;
+ }
+ return result;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTokenPartitioner.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTokenPartitioner.java
new file mode 100644
index 00000000..997d4270
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTokenPartitioner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Broadcastable wrapper for TokenPartitioner with ZERO transient fields to
optimize Spark broadcasting.
+ * <p>
+ * Only contains the partition mappings; executors will use this to
reconstruct TokenPartitioner.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ * <li>Minimize SizeEstimator reflection overhead during broadcast
preparation</li>
+ * <li>Reduce broadcast variable serialization size</li>
+ * <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableTokenPartitioner implements Serializable
+{
+ private static final long serialVersionUID = -8787074052066841748L;
+
+ // Essential fields broadcast to executors - the partition mappings
+ private final Map<Range<BigInteger>, Integer> partitionEntries;
+ private final Integer numberSplits;
+
+ /**
+ * Creates a BroadcastableTokenPartitioner from a TokenPartitioner.
+ * Extracts only the partition mappings, avoiding the Logger.
+ *
+ * @param source the source TokenPartitioner
+ */
+ public static BroadcastableTokenPartitioner from(@NotNull TokenPartitioner
source)
+ {
+ // Extract partition mappings - these are already computed and won't
change
+ Map<Range<BigInteger>, Integer> partitionEntries = new HashMap<>();
+ for (int i = 0; i < source.numPartitions(); i++)
+ {
+ Range<BigInteger> range = source.getTokenRange(i);
+ if (range != null)
+ {
+ partitionEntries.put(range, i);
+ }
+ }
+
+ return new BroadcastableTokenPartitioner(partitionEntries,
source.numSplits());
+ }
+
+ private BroadcastableTokenPartitioner(Map<Range<BigInteger>, Integer>
partitionEntries,
+ Integer numberSplits)
+ {
+ this.partitionEntries = partitionEntries;
+ this.numberSplits = numberSplits;
+ }
+
+ public Map<Range<BigInteger>, Integer> getPartitionEntries()
+ {
+ return partitionEntries;
+ }
+
+ public Integer numSplits()
+ {
+ return numberSplits;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index f389dcb5..3f52cd74 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -37,8 +37,8 @@ import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import o.a.c.sidecar.client.shaded.client.SidecarInstance;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.StorageClientConfig;
@@ -57,7 +57,6 @@ import org.jetbrains.annotations.Nullable;
public class BulkSparkConf implements Serializable
{
private static final long serialVersionUID = -5060973521517656241L;
- private static final Logger LOGGER =
LoggerFactory.getLogger(BulkSparkConf.class);
public static final String JDK11_OPTIONS = "
-Djdk.attach.allowAttachSelf=true"
+ " --add-exports
java.base/jdk.internal.misc=ALL-UNNAMED"
@@ -159,6 +158,12 @@ public class BulkSparkConf implements Serializable
private transient CoordinatedWriteConf coordinatedWriteConf; // it is
transient; deserialized from coordinatedWriteConfJson in executors
public BulkSparkConf(SparkConf conf, Map<String, String> options)
+ {
+ this(conf, options, null);
+ }
+
+ // NO LOGGER as member field - to avoid logger references in broadcast
variable.
+ public BulkSparkConf(SparkConf conf, Map<String, String> options,
@Nullable Logger logger)
{
this.conf = conf;
Optional<Integer> sidecarPortFromOptions =
MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar
port");
@@ -173,7 +178,10 @@ public class BulkSparkConf implements Serializable
String dc = MapUtils.getOrDefault(options,
WriterOptions.LOCAL_DC.name(), null);
if (!consistencyLevel.isLocal() && dc != null)
{
- LOGGER.warn("localDc is present for non-local consistency level {}
specified in writer options. Correcting localDc to null", consistencyLevel);
+ if (logger != null)
+ {
+ logger.warn("localDc is present for non-local consistency
level {} specified in writer options. Correcting localDc to null",
consistencyLevel);
+ }
dc = null;
}
this.localDC = dc;
@@ -231,7 +239,7 @@ public class BulkSparkConf implements Serializable
this.jobTimeoutSeconds = MapUtils.getLong(options,
WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
this.configuredJobId = MapUtils.getOrDefault(options,
WriterOptions.JOB_ID.name(), null);
this.coordinatedWriteConfJson = MapUtils.getOrDefault(options,
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
- this.coordinatedWriteConf =
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+ this.coordinatedWriteConf =
buildCoordinatedWriteConf(dataTransportInfo.getTransport(), logger);
this.digestAlgorithmSupplier =
digestAlgorithmSupplierFromOptions(dataTransport, options);
validateEnvironment();
}
@@ -304,14 +312,14 @@ public class BulkSparkConf implements Serializable
{
if (coordinatedWriteConf == null)
{
- coordinatedWriteConf =
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+ coordinatedWriteConf =
buildCoordinatedWriteConf(dataTransportInfo.getTransport(), null);
}
return coordinatedWriteConf;
}
@Nullable
- protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport
dataTransport)
+ protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport
dataTransport, @Nullable Logger logger)
{
if (coordinatedWriteConfJson == null)
{
@@ -323,17 +331,26 @@ public class BulkSparkConf implements Serializable
if (sidecarContactPointsValue != null)
{
- LOGGER.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are
ignored on the presence of COORDINATED_WRITE_CONF");
+ if (logger != null)
+ {
+ logger.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are
ignored on the presence of COORDINATED_WRITE_CONF");
+ }
}
if (userProvidedSidecarPort != -1)
{
- LOGGER.warn("SIDECAR_PORT is ignored on the presence of
COORDINATED_WRITE_CONF");
+ if (logger != null)
+ {
+ logger.warn("SIDECAR_PORT is ignored on the presence of
COORDINATED_WRITE_CONF");
+ }
}
if (localDC != null)
{
- LOGGER.warn("LOCAL_DC is ignored on the presence of
COORDINATED_WRITE_CONF");
+ if (logger != null)
+ {
+ logger.warn("LOCAL_DC is ignored on the presence of
COORDINATED_WRITE_CONF");
+ }
}
return CoordinatedWriteConf.create(coordinatedWriteConfJson,
consistencyLevel, SimpleClusterConf.class);
@@ -620,8 +637,6 @@ public class BulkSparkConf implements Serializable
String deprecatedSetting = settingPrefix + settingSuffix;
if (conf.contains(deprecatedSetting))
{
- LOGGER.warn("Found deprecated setting '{}'. Please use {}
in the future.",
- deprecatedSetting, settingName);
return deprecatedSetting;
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
new file mode 100644
index 00000000..e2a3283a
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Immutable configuration data class for BulkWriter jobs that is safe to
broadcast to Spark executors.
+ * This class contains pre-computed, serializable values that were computed on
the driver.
+ * <p>
+ * Serialization Architecture:
+ * This class is the ONLY object that gets broadcast to Spark executors (via
Spark's broadcast mechanism).
+ * It contains broadcastable wrapper implementations with ZERO transient
fields and NO Logger references:
+ * <ul>
+ * <li>{@link BroadcastableClusterInfo} or {@link
BroadcastableClusterInfoGroup} - cluster metadata</li>
+ * <li>{@link BroadcastableJobInfo} - job configuration with {@link
BroadcastableTokenPartitioner}</li>
+ * <li>{@link BroadcastableSchemaInfo} - schema metadata</li>
+ * </ul>
+ * <p>
+ * On the driver, {@link BulkWriterContext} instances use driver-only
implementations:
+ * {@link CassandraClusterInfo}, {@link CassandraJobInfo}, {@link
CassandraSchemaInfo}.
+ * Before broadcasting, these are converted to broadcastable wrappers to avoid
Logger references
+ * and minimize Spark SizeEstimator overhead.
+ * <p>
+ * On executors, {@link BulkWriterContext} instances are reconstructed from
this config using
+ * {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the
broadcastable
+ * wrappers and reconstructs the full implementations with fresh data from
Cassandra Sidecar.
+ */
+public final class BulkWriterConfig implements Serializable
+{
+ private static final long serialVersionUID = 1L;
+
+ private final BulkSparkConf conf;
+ private final int sparkDefaultParallelism;
+ private final BroadcastableJobInfo jobInfo;
+ // BroadcastableClusterInfo can be either BroadcastableCluster or
BroadcastableClusterInfoGroup
+ private final IBroadcastableClusterInfo clusterInfo;
+ private final BroadcastableSchemaInfo schemaInfo;
+ private final String lowestCassandraVersion;
+
+ /**
+ * Creates a new immutable BulkWriterConfig with pre-computed values
+ *
+ * @param conf Bulk writer Spark configuration
+ * @param sparkDefaultParallelism Spark default parallelism setting
+ * @param jobInfo Broadcastable job information
+ * @param clusterInfo Broadcastable cluster information
(BroadcastableCluster or BroadcastableClusterInfoGroup)
+ * @param schemaInfo Broadcastable schema information
+ * @param lowestCassandraVersion Lowest Cassandra version in the cluster
+ */
+ public BulkWriterConfig(@NotNull BulkSparkConf conf,
+ int sparkDefaultParallelism,
+ @NotNull BroadcastableJobInfo jobInfo,
+ @NotNull IBroadcastableClusterInfo clusterInfo,
+ @NotNull BroadcastableSchemaInfo schemaInfo,
+ @NotNull String lowestCassandraVersion)
+ {
+ this.conf = conf;
+ this.sparkDefaultParallelism = sparkDefaultParallelism;
+ this.jobInfo = jobInfo;
+ this.clusterInfo = clusterInfo;
+ this.schemaInfo = schemaInfo;
+ this.lowestCassandraVersion = lowestCassandraVersion;
+ }
+
+ public BulkSparkConf getConf()
+ {
+ return conf;
+ }
+
+ public int getSparkDefaultParallelism()
+ {
+ return sparkDefaultParallelism;
+ }
+
+ public BroadcastableJobInfo getBroadcastableJobInfo()
+ {
+ return jobInfo;
+ }
+
+ public IBroadcastableClusterInfo getBroadcastableClusterInfo()
+ {
+ return clusterInfo;
+ }
+
+ public BroadcastableSchemaInfo getBroadcastableSchemaInfo()
+ {
+ return schemaInfo;
+ }
+
+ public String getLowestCassandraVersion()
+ {
+ return lowestCassandraVersion;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
index 40012e01..a76b7c60 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
@@ -19,12 +19,22 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.Serializable;
-
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
import org.apache.cassandra.bridge.CassandraBridge;
-public interface BulkWriterContext extends Serializable
+/**
+ * Context for bulk write operations, providing access to cluster, job,
schema, and transport information.
+ * <p>
+ * Serialization Architecture:
+ * This interface does NOT extend Serializable. BulkWriterContext instances
are never broadcast to executors.
+ * Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct
BulkWriterContext instances
+ * from the config using the factory method {@link #from(BulkWriterConfig)}.
+ * <p>
+ * The implementations ({@link CassandraBulkWriterContext}, {@link
CassandraCoordinatedBulkWriterContext})
+ * do NOT have serialVersionUID fields as they are never serialized.
+ */
+public interface BulkWriterContext
{
ClusterInfo cluster();
@@ -41,4 +51,25 @@ public interface BulkWriterContext extends Serializable
void shutdown();
TransportContext transportContext();
+
+ /**
+ * Factory method to create a BulkWriterContext from a BulkWriterConfig on
executors.
+ * This method reconstructs context instances on executors from the
broadcast configuration.
+ * The driver creates contexts directly using constructors, not this
method.
+ *
+ * @param config the immutable configuration object broadcast from driver
+ * @return a new BulkWriterContext instance
+ */
+ static BulkWriterContext from(BulkWriterConfig config)
+ {
+ BulkSparkConf conf = config.getConf();
+ if (conf.isCoordinatedWriteConfigured())
+ {
+ return new CassandraCoordinatedBulkWriterContext(config);
+ }
+ else
+ {
+ return new CassandraBulkWriterContext(config);
+ }
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
index 04907221..2f3a9222 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
@@ -68,7 +68,7 @@ public class BulkWriterContextFactory
@NotNull
protected BulkSparkConf createBulkSparkConf(@NotNull SparkContext
sparkContext, @NotNull Map<String, String> options)
{
- return new BulkSparkConf(sparkContext.getConf(), options);
+ return new BulkSparkConf(sparkContext.getConf(), options, LOGGER);
}
@NotNull
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 40d1b330..f6323af3 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -42,12 +42,14 @@ import
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageDataTransf
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.exception.SidecarApiCallException;
import
org.apache.cassandra.spark.exception.UnsupportedAnalyticsOperationException;
import
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -75,7 +77,7 @@ public class CassandraBulkSourceRelation extends BaseRelation
implements Inserta
private final BulkWriterContext writerContext;
private final SQLContext sqlContext;
private final JavaSparkContext sparkContext;
- private final Broadcast<BulkWriterContext> broadcastContext;
+ private final Broadcast<BulkWriterConfig> broadcastConfig;
private final BulkWriteValidator writeValidator;
private final SimpleTaskScheduler simpleTaskScheduler;
private ImportCoordinator importCoordinator = null; // value is only set
when using S3_COMPAT
@@ -87,12 +89,70 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
this.writerContext = writerContext;
this.sqlContext = sqlContext;
this.sparkContext =
JavaSparkContext.fromSparkContext(sqlContext.sparkContext());
- this.broadcastContext =
sparkContext.<BulkWriterContext>broadcast(writerContext);
+ // Extract immutable configuration from the context for broadcasting
+ BulkWriterConfig config = extractConfig(writerContext,
sparkContext.defaultParallelism());
+ this.broadcastConfig =
sparkContext.<BulkWriterConfig>broadcast(config);
ReplicaAwareFailureHandler<RingInstance> failureHandler = new
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
this.writeValidator = new BulkWriteValidator(writerContext,
failureHandler);
this.simpleTaskScheduler = new SimpleTaskScheduler();
}
+ /**
+ * Extracts immutable configuration from a BulkWriterContext for
broadcasting.
+ * Creates BroadcastableCluster, BroadcastableJobInfo, and
BroadcastableSchemaInfo
+ * to ensure zero transient fields and avoid Logger references in the
broadcast object.
+ */
+ private static BulkWriterConfig extractConfig(BulkWriterContext context,
int sparkDefaultParallelism)
+ {
+ if (context instanceof AbstractBulkWriterContext)
+ {
+ AbstractBulkWriterContext abstractContext =
(AbstractBulkWriterContext) context;
+ ClusterInfo originalClusterInfo = abstractContext.cluster();
+
+ // Create BroadcastableCluster to avoid transient fields in
broadcast
+ IBroadcastableClusterInfo broadcastableClusterInfo;
+ if (originalClusterInfo instanceof CassandraClusterInfoGroup)
+ {
+ // Coordinated write scenario
+ @SuppressWarnings("unchecked")
+ CassandraClusterInfoGroup multiCluster =
(CassandraClusterInfoGroup) originalClusterInfo;
+ broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(
+ multiCluster,
+ abstractContext.bulkSparkConf()
+ );
+ }
+ else
+ {
+ // Single cluster scenario
+ broadcastableClusterInfo = BroadcastableClusterInfo.from(
+ originalClusterInfo,
+ abstractContext.bulkSparkConf()
+ );
+ }
+
+ // Create BroadcastableJobInfo to avoid Logger in TokenPartitioner
+ BroadcastableJobInfo broadcastableJobInfo =
BroadcastableJobInfo.from(
+ abstractContext.job(),
+ abstractContext.bulkSparkConf()
+ );
+
+ // Create BroadcastableSchemaInfo to avoid Logger in TableSchema
+ BroadcastableSchemaInfo broadcastableSchemaInfo =
BroadcastableSchemaInfo.from(
+ abstractContext.schema()
+ );
+
+ return new BulkWriterConfig(
+ abstractContext.bulkSparkConf(),
+ sparkDefaultParallelism,
+ broadcastableJobInfo,
+ broadcastableClusterInfo,
+ broadcastableSchemaInfo,
+ abstractContext.lowestCassandraVersion()
+ );
+ }
+ throw new IllegalArgumentException("Cannot extract config from context
type: " + context.getClass().getName());
+ }
+
@Override
@NotNull
public SQLContext sqlContext()
@@ -128,14 +188,15 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
this.startTimeNanos = System.nanoTime();
maybeScheduleTimeout();
maybeEnableTransportExtension();
- Tokenizer tokenizer = new Tokenizer(writerContext);
- TableSchema tableSchema = writerContext.schema().getTableSchema();
+ BroadcastableTableSchema broadcastableTableSchema =
broadcastConfig.value().getBroadcastableSchemaInfo().getBroadcastableTableSchema();
+ boolean isMurmur3Partitioner =
writerContext.cluster().getPartitioner() == Partitioner.Murmur3Partitioner;
+ Tokenizer tokenizer = new Tokenizer(broadcastableTableSchema,
isMurmur3Partitioner);
JavaPairRDD<DecoratedKey, Object[]> sortedRDD = data.toJavaRDD()
.map(Row::toSeq)
.map(seq ->
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
-
.map(tableSchema::normalize)
+
.map(broadcastableTableSchema::normalize)
.keyBy(tokenizer::getDecoratedKey)
-
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
+
.repartitionAndSortWithinPartitions(writerContext.job().getTokenPartitioner());
persist(sortedRDD, data.columns());
}
@@ -180,14 +241,14 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
try
{
- // Copy the broadcast context as a local variable (by passing as
the input) to avoid serialization error
+ // Copy the broadcast config as a local variable (by passing as
the input) to avoid serialization error
// W/o this, SerializedLambda captures the
CassandraBulkSourceRelation object, which is not serializable (required by
Spark),
// as a captured argument. It causes "Task not serializable" error.
List<WriteResult> writeResults = sortedRDD
-
.mapPartitions(writeRowsInPartition(broadcastContext, columnNames))
+
.mapPartitions(writeRowsInPartition(broadcastConfig, columnNames))
.collect();
- // Unpersist broadcast context to free up executors while driver
waits for the
+ // Unpersist broadcast config to free up executors while driver
waits for the
// import to complete
unpersist();
@@ -330,15 +391,16 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
}
/**
- * Get a ref copy of BulkWriterContext broadcast variable and compose a
function to transform a partition into StreamResult
+ * Get a ref copy of BulkWriterConfig broadcast variable and compose a
function to transform a partition into StreamResult
*
- * @param ctx BulkWriterContext broadcast variable
+ * @param config BulkWriterConfig broadcast variable
+ * @param columnNames column names array
* @return FlatMapFunction
*/
private static FlatMapFunction<Iterator<Tuple2<DecoratedKey, Object[]>>,
WriteResult>
- writeRowsInPartition(Broadcast<BulkWriterContext> ctx, String[]
columnNames)
+ writeRowsInPartition(Broadcast<BulkWriterConfig> config, String[]
columnNames)
{
- return iterator -> Collections.singleton(new
RecordWriter(ctx.getValue(), columnNames).write(iterator)).iterator();
+ return iterator -> Collections.singleton(new
RecordWriter(config.getValue(), columnNames).write(iterator)).iterator();
}
/**
@@ -348,8 +410,8 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
{
try
{
- LOGGER.info("Unpersisting broadcast context");
- broadcastContext.unpersist(false);
+ LOGGER.info("Unpersisting broadcast config");
+ broadcastConfig.unpersist(false);
}
catch (Throwable throwable)
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 2eea6276..5c516837 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -28,11 +28,15 @@ import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClust
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
+/**
+ * BulkWriterContext implementation for single cluster write operations.
+ * <p>
+ * This class does NOT have a serialVersionUID because it is never directly
serialized.
+ * See {@link AbstractBulkWriterContext} for details on the serialization
architecture.
+ */
// CHECKSTYLE IGNORE: This class cannot be declared as final, because
consumers should be able to extend it
public class CassandraBulkWriterContext extends AbstractBulkWriterContext
{
- private static final long serialVersionUID = 8241993502687688783L;
-
protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
@NotNull StructType structType,
int sparkDefaultParallelism)
@@ -40,6 +44,17 @@ public class CassandraBulkWriterContext extends
AbstractBulkWriterContext
super(conf, structType, sparkDefaultParallelism);
}
+ /**
+ * Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)}
factory method.
+ * This constructor is only used on executors to reconstruct context from
broadcast config.
+ *
+ * @param config immutable configuration for the bulk writer
+ */
+ protected CassandraBulkWriterContext(@NotNull BulkWriterConfig config)
+ {
+ super(config);
+ }
+
@Override
protected ClusterInfo buildClusterInfo()
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index f84a9337..ad143fb0 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -62,9 +62,20 @@ import org.jetbrains.annotations.Nullable;
import static
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
+/**
+ * Driver-only implementation of {@link ClusterInfo} for single cluster
operations.
+ * <p>
+ * This class is NOT serialized and does NOT have a serialVersionUID.
+ * When broadcasting to executors, the driver extracts information from this
class
+ * and creates a {@link BroadcastableClusterInfo} instance, which is then
included
+ * in the {@link BulkWriterConfig} that gets broadcast.
+ * <p>
+ * This class implements Serializable only because the {@link ClusterInfo}
interface
+ * requires it (for use as a field type in broadcast classes), but instances
of this
+ * class are never directly serialized.
+ */
public class CassandraClusterInfo implements ClusterInfo, Closeable
{
- private static final long serialVersionUID = -6944818863462956767L;
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterInfo.class);
protected final BulkSparkConf conf;
@@ -72,12 +83,12 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
protected String cassandraVersion;
protected Partitioner partitioner;
- protected transient volatile TokenRangeMapping<RingInstance>
tokenRangeReplicas;
- protected transient volatile String keyspaceSchema;
- protected transient volatile ReplicationFactor replicationFactor;
- protected transient volatile CassandraContext cassandraContext;
- protected final transient AtomicReference<NodeSettings> nodeSettings;
- protected final transient List<CompletableFuture<NodeSettings>>
allNodeSettingFutures;
+ protected volatile TokenRangeMapping<RingInstance> tokenRangeReplicas;
+ protected volatile String keyspaceSchema;
+ protected volatile ReplicationFactor replicationFactor;
+ protected volatile CassandraContext cassandraContext;
+ protected final AtomicReference<NodeSettings> nodeSettings;
+ protected final List<CompletableFuture<NodeSettings>>
allNodeSettingFutures;
public CassandraClusterInfo(BulkSparkConf conf)
{
@@ -96,6 +107,26 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
cassandraContext.getCluster());
}
+ /**
+ * Reconstruct from BroadcastableCluster on executor.
+ * Reuses cassandraVersion and partitioner from broadcast,
+ * fetches other data (tokenRangeMapping, replicationFactor,
keyspaceSchema, writeAvailability) fresh from Sidecar.
+ *
+ * @param broadcastable the broadcastable cluster info from broadcast
+ */
+ public CassandraClusterInfo(BroadcastableClusterInfo broadcastable)
+ {
+ this.conf = broadcastable.getConf();
+ this.clusterId = broadcastable.clusterId();
+ this.cassandraVersion = broadcastable.getLowestCassandraVersion();
+ this.partitioner = broadcastable.getPartitioner();
+ this.cassandraContext = buildCassandraContext();
+ LOGGER.info("Reconstructing CassandraClusterInfo on executor from
BroadcastableCluster. clusterId={}", clusterId);
+ this.nodeSettings = new AtomicReference<>(null);
+ // Executors do not need to query all node settings since
cassandraVersion is already set from broadcast
+ this.allNodeSettingFutures = null;
+ }
+
@Override
public void checkBulkWriterIsEnabledOrThrow()
{
@@ -415,6 +446,12 @@ public class CassandraClusterInfo implements ClusterInfo,
Closeable
protected List<NodeSettings> getAllNodeSettings()
{
+ if (allNodeSettingFutures == null)
+ {
+ throw new IllegalStateException("getAllNodeSettings should not be
called on executor. "
+ + "Cassandra version is
pre-computed on driver and broadcast to executors.");
+ }
+
// Worst-case, the http client is configured for 1 worker pool.
// In that case, each future can take the full retry delay * number of
retries,
// and each instance will be processed serially.
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 27b8eb45..776124ec 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -32,7 +32,6 @@ import org.jetbrains.annotations.Nullable;
public class CassandraJobInfo implements JobInfo
{
- private static final long serialVersionUID = 6140098484732683759L;
protected final BulkSparkConf conf;
// restoreJobId per cluster; it is guaranteed to be non-empty
protected final MultiClusterContainer<UUID> restoreJobIds;
@@ -46,6 +45,21 @@ public class CassandraJobInfo implements JobInfo
this.tokenPartitioner = tokenPartitioner;
}
+ /**
+ * Reconstruct from BroadcastableJobInfo on executor.
+ * Reuses conf and restoreJobIds from broadcast,
+ * and reconstructs TokenPartitioner from broadcastable partition mappings.
+ *
+ * @param broadcastable the broadcastable job info from broadcast
+ */
+ public CassandraJobInfo(BroadcastableJobInfo broadcastable)
+ {
+ this.conf = broadcastable.getConf();
+ this.restoreJobIds = broadcastable.getRestoreJobIds();
+ // Reconstruct TokenPartitioner from broadcastable partition mappings
+ this.tokenPartitioner = new
TokenPartitioner(broadcastable.getBroadcastableTokenPartitioner());
+ }
+
@Override
public ConsistencyLevel getConsistencyLevel()
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
index 526d9a77..db98cf71 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
@@ -23,7 +23,6 @@ import java.util.Set;
public class CassandraSchemaInfo implements SchemaInfo
{
- private static final long serialVersionUID = -2327383232935001862L;
private final TableSchema tableSchema;
private final Set<String> userDefinedTypeStatements;
@@ -33,6 +32,18 @@ public class CassandraSchemaInfo implements SchemaInfo
this.userDefinedTypeStatements = userDefinedTypeStatements;
}
+ /**
+ * Reconstruct from BroadcastableSchemaInfo on executor.
+ * Reconstructs TableSchema from BroadcastableTableSchema (no Sidecar
calls needed).
+ *
+ * @param broadcastable the broadcastable schema info from broadcast
+ */
+ public CassandraSchemaInfo(BroadcastableSchemaInfo broadcastable)
+ {
+ this(new TableSchema(broadcastable.getBroadcastableTableSchema()),
+ broadcastable.getUserDefinedTypeStatements());
+ }
+
@Override
public TableSchema getTableSchema()
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
index 61cd6942..64b65444 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.Serializable;
import java.math.BigInteger;
import java.util.Map;
@@ -33,7 +32,20 @@ import
org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
import org.apache.cassandra.spark.validation.StartupValidatable;
import org.jetbrains.annotations.Nullable;
-public interface ClusterInfo extends StartupValidatable, Serializable
+/**
+ * Interface for cluster information used in bulk write operations.
+ * <p>
+ * Serialization Architecture:
+ * This interface does NOT extend Serializable. ClusterInfo instances are
never directly serialized.
+ * Driver-only implementations ({@link CassandraClusterInfo},
+ * {@link
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup})
+ * are converted to broadcastable wrappers ({@link BroadcastableClusterInfo},
{@link BroadcastableClusterInfoGroup})
+ * for broadcasting to executors via {@link BulkWriterConfig}.
+ * <p>
+ * On executors, ClusterInfo instances are reconstructed from the
broadcastable wrappers using
+ * {@link
AbstractBulkWriterContext#reconstructClusterInfoOnExecutor(IBroadcastableClusterInfo)}.
+ */
+public interface ClusterInfo extends StartupValidatable
{
void refreshClusterInfo();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/IBroadcastableClusterInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/IBroadcastableClusterInfo.java
new file mode 100644
index 00000000..fb4eb2e2
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/IBroadcastableClusterInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Minimal interface for cluster information that can be safely broadcast to
Spark executors.
+ * This interface contains only the essential methods that broadcastable
cluster info implementations
+ * ({@link BroadcastableClusterInfo}, {@link BroadcastableClusterInfoGroup})
need to provide.
+ * <p>
+ * Unlike {@link ClusterInfo}, this interface doesn't include methods that
require fresh data
+ * from Cassandra Sidecar or runtime operations. These implementations are
designed to be broadcast
+ * and then reconstructed to full {@link ClusterInfo} instances on executors.
+ * <p>
+ * Methods in this interface:
+ * <ul>
+ * <li>{@link #getPartitioner()} - static cluster partitioner
configuration</li>
+ * <li>{@link #getLowestCassandraVersion()} - pre-computed version
string</li>
+ * <li>{@link #clusterId()} - cluster identifier (optional)</li>
+ * <li>{@link #getConf()} - BulkSparkConf needed for reconstruction on
executors</li>
+ * <li>{@link #reconstruct()} - reconstructs full ClusterInfo instance on
executors</li>
+ * </ul>
+ */
+public interface IBroadcastableClusterInfo extends Serializable
+{
+ /**
+ * @return the partitioner used by the cluster
+ */
+ Partitioner getPartitioner();
+
+ /**
+ * @return the lowest Cassandra version in the cluster
+ */
+ String getLowestCassandraVersion();
+
+ /**
+ * ID string that can uniquely identify a cluster.
+ * When writing to a single cluster, this may be null.
+ * When in coordinated write mode (writing to multiple clusters), this
must return a unique string.
+ *
+ * @return cluster id string, null if absent
+ */
+ @Nullable
+ String clusterId();
+
+ /**
+ * @return the BulkSparkConf configuration needed to reconstruct
ClusterInfo on executors
+ */
+ @NotNull
+ BulkSparkConf getConf();
+
+ /**
+ * Reconstructs a full ClusterInfo instance from this broadcastable data
on executors.
+ * Each implementation knows how to reconstruct itself into the
appropriate ClusterInfo type.
+ * This allows adding new broadcastable types without modifying the
reconstruction logic
+ * in {@link AbstractBulkWriterContext}.
+ *
+ * @return reconstructed ClusterInfo (CassandraClusterInfo or
CassandraClusterInfoGroup)
+ */
+ ClusterInfo reconstruct();
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 2c4a089e..87ad86c0 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.Serializable;
import java.util.NoSuchElementException;
import java.util.UUID;
@@ -29,7 +28,14 @@ import org.apache.cassandra.spark.data.QualifiedTableName;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-public interface JobInfo extends Serializable
+/**
+ * Provides job-specific configuration and information for bulk write
operations.
+ * <p>
+ * This interface does NOT extend Serializable. JobInfo instances are never
serialized.
+ * For broadcast to executors, {@link BroadcastableJobInfo} is used instead,
and executors
+ * reconstruct JobInfo instances from the broadcast data.
+ */
+public interface JobInfo
{
// ******************
// Job Information API - should this really just move back to Config? Here
to try to reduce the violations of the Law of Demeter more than anything else
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index a33ff642..92a866cb 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -84,9 +84,16 @@ public class RecordWriter
private final CqlTable cqlTable;
private StreamSession<?> streamSession = null;
- public RecordWriter(BulkWriterContext writerContext, String[] columnNames)
+ /**
+ * Constructor that accepts a BulkWriterConfig and constructs the context
on the executor.
+ * This is used when the config is broadcast to executors.
+ *
+ * @param config immutable configuration broadcast from driver
+ * @param columnNames column names array
+ */
+ public RecordWriter(BulkWriterConfig config, String[] columnNames)
{
- this(writerContext, columnNames, TaskContext::get,
SortedSSTableWriter::new);
+ this(BulkWriterContext.from(config), columnNames, TaskContext::get,
SortedSSTableWriter::new);
}
@VisibleForTesting
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
index 0257d29c..b21d5fd4 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
@@ -19,12 +19,18 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.Serializable;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
-public interface SchemaInfo extends Serializable
+/**
+ * Provides schema information for bulk write operations.
+ * <p>
+ * This interface does NOT extend Serializable. SchemaInfo instances are never
serialized.
+ * For broadcast to executors, {@link BroadcastableSchemaInfo} is used
instead, and executors
+ * reconstruct SchemaInfo instances (specifically {@link CassandraSchemaInfo})
from the broadcast data.
+ */
+public interface SchemaInfo
{
TableSchema getTableSchema();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 47909bbf..93f02c14 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
@@ -28,7 +27,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +37,17 @@ import org.apache.cassandra.spark.common.schema.ColumnType;
import org.apache.cassandra.spark.data.CqlField;
import
org.apache.cassandra.spark.exception.UnsupportedAnalyticsOperationException;
import org.apache.spark.sql.types.StructType;
-import org.jetbrains.annotations.NotNull;
import static
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
-public class TableSchema implements Serializable
+/**
+ * Schema information for bulk write operations.
+ * <p>
+ * This class does NOT implement Serializable (Logger is not serializable).
+ * For broadcast to executors, {@link BroadcastableTableSchema} is used
instead,
+ * and executors reconstruct TableSchema from the broadcastable data.
+ */
+public class TableSchema
{
private static final Logger LOGGER =
LoggerFactory.getLogger(TableSchema.class);
@@ -52,12 +56,12 @@ public class TableSchema implements Serializable
final List<String> partitionKeyColumns;
final List<ColumnType<?>> partitionKeyColumnTypes;
final List<SqlToCqlTypeConverter.Converter<?>> converters;
- private final List<Integer> keyFieldPositions;
- private final WriteMode writeMode;
- private final TTLOption ttlOption;
- private final TimestampOption timestampOption;
- private final String lowestCassandraVersion;
- private final boolean quoteIdentifiers;
+ final List<Integer> keyFieldPositions;
+ final WriteMode writeMode;
+ final TTLOption ttlOption;
+ final TimestampOption timestampOption;
+ final String lowestCassandraVersion;
+ final boolean quoteIdentifiers;
public TableSchema(StructType dfSchema,
TableInfoProvider tableInfo,
@@ -85,6 +89,27 @@ public class TableSchema implements Serializable
this.keyFieldPositions = getKeyFieldPositions(dfSchema,
tableInfo.getColumnNames(), getRequiredKeyColumns(tableInfo));
}
+ /**
+ * Reconstruct TableSchema from BroadcastableTableSchema on executor.
+ * This constructor is used only on executors when reconstructing from
broadcast data.
+ *
+ * @param broadcastable the broadcastable table schema from broadcast
+ */
+ public TableSchema(BroadcastableTableSchema broadcastable)
+ {
+ this.createStatement = broadcastable.getCreateStatement();
+ this.modificationStatement = broadcastable.getModificationStatement();
+ this.partitionKeyColumns = broadcastable.getPartitionKeyColumns();
+ this.partitionKeyColumnTypes =
broadcastable.getPartitionKeyColumnTypes();
+ this.converters = broadcastable.getConverters();
+ this.keyFieldPositions = broadcastable.getKeyFieldPositions();
+ this.writeMode = broadcastable.getWriteMode();
+ this.ttlOption = broadcastable.getTtlOption();
+ this.timestampOption = broadcastable.getTimestampOption();
+ this.lowestCassandraVersion =
broadcastable.getLowestCassandraVersion();
+ this.quoteIdentifiers = broadcastable.isQuoteIdentifiers();
+ }
+
private List<String> getRequiredKeyColumns(TableInfoProvider tableInfo)
{
switch (writeMode)
@@ -100,34 +125,6 @@ public class TableSchema implements Serializable
}
}
- public Object[] normalize(Object[] row)
- {
- for (int index = 0; index < row.length; index++)
- {
- row[index] = converters.get(index).convert(row[index]);
- }
- return row;
- }
-
- public Object[] getKeyColumns(Object[] allColumns)
- {
- return getKeyColumns(allColumns, keyFieldPositions);
- }
-
- @VisibleForTesting
- @NotNull
- public static Object[] getKeyColumns(Object[] allColumns, List<Integer>
keyFieldPositions)
- {
- Object[] result = new Object[keyFieldPositions.size()];
- for (int keyFieldPosition = 0; keyFieldPosition <
keyFieldPositions.size(); keyFieldPosition++)
- {
- Object colVal =
allColumns[keyFieldPositions.get(keyFieldPosition)];
- Preconditions.checkNotNull(colVal, "Found a null primary or
composite key column in source data. All key columns must be non-null.");
- result[keyFieldPosition] = colVal;
- }
- return result;
- }
-
private static List<SqlToCqlTypeConverter.Converter<?>>
getConverters(StructType dfSchema,
TableInfoProvider tableInfo,
TTLOption ttlOption,
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
index 1e2fc6e0..479ddcae 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
@@ -19,9 +19,6 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Comparator;
@@ -45,6 +42,32 @@ import org.apache.cassandra.spark.utils.RangeUtils;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.spark.Partitioner;
+/**
+ * Spark Partitioner for distributing data across Cassandra token ranges.
+ * <p>
+ * Serialization Architecture:
+ * This class supports TWO distinct serialization mechanisms, each serving a
different purpose:
+ * <p>
+ * 1. <b>Direct Java Serialization (via writeObject/readObject)</b>:
+ * Used when Spark serializes this Partitioner for shuffle operations like
+ * {@code repartitionAndSortWithinPartitions()}. During shuffle, Spark
sends the Partitioner
+ * to executors to determine which partition each record belongs to. The
custom serialization
+ * methods at the end of this class handle saving/restoring the partition
mappings.
+ * <p>
+ * 2. <b>Broadcast Variable Pattern (via BroadcastableTokenPartitioner)</b>:
+ * Used when broadcasting job configuration to executors. The driver
extracts partition mappings
+ * into {@link BroadcastableTokenPartitioner} (a pure data wrapper with no
transient fields),
+ * which is broadcast via {@link BulkWriterConfig}. Executors reconstruct
TokenPartitioner from
+ * the broadcast data using the constructor {@link
#TokenPartitioner(BroadcastableTokenPartitioner)}.
+ * <p>
+ * Both mechanisms are necessary because:
+ * - Shuffle operations (repartitionAndSortWithinPartitions) serialize the
Partitioner directly
+ * - Broadcast variables use the broadcastable wrapper pattern to avoid Logger
serialization issues
+ * <p>
+ * The transient fields (partitionMap, reversePartitionMap, nrPartitions) are
marked transient to
+ * avoid serializing large/complex objects when not needed, but are properly
handled by custom
+ * serialization when direct serialization is required.
+ */
public class TokenPartitioner extends Partitioner
{
private static final Logger LOGGER =
LoggerFactory.getLogger(TokenPartitioner.class);
@@ -75,10 +98,51 @@ public class TokenPartitioner extends Partitioner
this.tokenRangeMapping = tokenRangeMapping;
this.numberSplits = calculateSplits(tokenRangeMapping,
userSpecifiedNumberSplits, defaultParallelism, cores);
setupTokenRangeMap(randomize);
- validate(); // Intentionally not keeping this in readObject(), it is
enough to validate in constructor alone
- LOGGER.info("Partition map " + partitionMap);
- LOGGER.info("Reverse partition map " + reversePartitionMap);
- LOGGER.info("Number of partitions {}", nrPartitions);
+ validate(); // Intentionally keeping validation in the driver alone;
there is no need to re-validate when constructing in executors
+ logPartitionInfo();
+ }
+
+ private void logPartitionInfo()
+ {
+ LOGGER.info("Number of partitions: {}", nrPartitions);
+ LOGGER.info("Partition map: {}", partitionMap);
+ LOGGER.info("Reverse partition map: {}", reversePartitionMap);
+ }
+
+ /**
+ * Reconstruct TokenPartitioner from BroadcastableTokenPartitioner on
executor.
+ * <p>
+ * This constructor is part of the <b>broadcast variable</b> serialization
mechanism.
+ * When BulkWriterConfig is broadcast to executors, it contains
BroadcastableTokenPartitioner
+ * (a pure data wrapper). Executors use this constructor to rebuild the
TokenPartitioner
+ * with all necessary partition mappings.
+ * <p>
+ * This reconstruction path is separate from the direct Java serialization
(writeObject/readObject)
+ * used for Spark shuffle operations. The broadcast pattern is preferred
for configuration data
+ * because it avoids Logger serialization issues and minimizes broadcast
size.
+ *
+ * @param broadcastable the broadcastable token partitioner from broadcast
variable
+ * @see BroadcastableTokenPartitioner
+ * @see BulkWriterConfig
+ */
+ public TokenPartitioner(BroadcastableTokenPartitioner broadcastable)
+ {
+ this.tokenRangeMapping = null; // Not needed on executors
+ this.numberSplits = broadcastable.numSplits();
+ this.partitionMap = com.google.common.collect.TreeRangeMap.create();
+ this.reversePartitionMap = new HashMap<>();
+ this.nrPartitions = 0;
+
+ // Restore partition mappings from serialized form
+ broadcastable.getPartitionEntries().forEach((range, partitionId) -> {
+ this.partitionMap.put(range, partitionId);
+ this.reversePartitionMap.put(partitionId, range);
+ if (partitionId >= this.nrPartitions)
+ {
+ this.nrPartitions = partitionId + 1;
+ }
+ });
+ logPartitionInfo();
}
@Override
@@ -137,6 +201,7 @@ public class TokenPartitioner extends Partitioner
this.nrPartitions = nextPartitionId.get();
}
+ // only invoked in driver
private void validate()
{
validateMapSizes();
@@ -198,31 +263,6 @@ public class TokenPartitioner extends Partitioner
reversePartitionMap.keySet().size()));
}
- private void writeObject(ObjectOutputStream out) throws IOException
- {
- out.defaultWriteObject();
- HashMap<Range<BigInteger>, Integer> partitionEntires = new HashMap<>();
- partitionMap.asMapOfRanges().forEach(partitionEntires::put);
- out.writeObject(partitionEntires);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws
ClassNotFoundException, IOException
- {
- in.defaultReadObject();
- HashMap<Range<BigInteger>, Integer> partitionEntires =
(HashMap<Range<BigInteger>, Integer>) in.readObject();
- partitionMap = TreeRangeMap.create();
- reversePartitionMap = new HashMap<>();
- partitionEntires.forEach((r, i) -> {
- partitionMap.put(r, i);
- reversePartitionMap.put(i, r);
- nrPartitions++;
- });
- LOGGER.info("Partition map " + partitionMap);
- LOGGER.info("Reverse partition map " + reversePartitionMap);
- LOGGER.info("Number of partitions {}", nrPartitions);
- }
-
// In order to best utilize the number of Spark cores while minimizing the
number of commit calls,
// we calculate the number of splits that will just match or exceed the
total number of available Spark cores.
// Note that the actual number of partitions that result from this should
always be at least the number of token ranges * the number of splits,
@@ -250,4 +290,72 @@ public class TokenPartitioner extends Partitioner
{
return (a + b - 1) / b;
}
+
+ /**
+ * Custom serialization for Spark shuffle operations (e.g.,
repartitionAndSortWithinPartitions).
+ * <p>
+ * This method is invoked when Spark serializes the Partitioner to send it
to executors during
+ * shuffle operations. It saves the essential partition mappings so they
can be reconstructed
+ * on executors. This is separate from the broadcast variable
serialization mechanism.
+ * <p>
+ * Note: This serialization path is used when the TokenPartitioner is
passed directly to Spark
+ * operations (e.g., {@code
.repartitionAndSortWithinPartitions(tokenPartitioner)}), not when
+ * it's broadcast as part of BulkWriterConfig.
+ *
+ * @param out the ObjectOutputStream to write to
+ * @throws java.io.IOException if an I/O error occurs during serialization
+ * @see #readObject(java.io.ObjectInputStream)
+ */
+ private void writeObject(java.io.ObjectOutputStream out) throws
java.io.IOException
+ {
+ out.defaultWriteObject();
+ // Serialize the partition mappings
+ Map<Range<BigInteger>, Integer> partitionEntries =
partitionMap.asMapOfRanges();
+ out.writeInt(partitionEntries.size());
+ for (Map.Entry<Range<BigInteger>, Integer> entry :
partitionEntries.entrySet())
+ {
+ out.writeObject(entry.getKey());
+ out.writeInt(entry.getValue());
+ }
+ }
+
+ /**
+ * Custom deserialization for Spark shuffle operations.
+ * <p>
+ * This method is invoked when Spark deserializes the Partitioner on
executors during shuffle
+ * operations. It reconstructs the transient fields (partitionMap,
reversePartitionMap, nrPartitions)
+ * from the serialized data. This ensures the Partitioner can correctly
map tokens to partitions
+ * after deserialization.
+ * <p>
+ * Note: This deserialization path is used when the TokenPartitioner was
serialized by Spark
+ * for shuffle operations, not when it's reconstructed from a broadcast
BroadcastableTokenPartitioner.
+ *
+ * @param in the ObjectInputStream to read from
+ * @throws java.io.IOException if an I/O error occurs during
deserialization
+ * @throws ClassNotFoundException if the class of a serialized object
cannot be found
+ * @see #writeObject(java.io.ObjectOutputStream)
+ */
+ private void readObject(java.io.ObjectInputStream in) throws
java.io.IOException, ClassNotFoundException
+ {
+ in.defaultReadObject();
+ // Reconstruct partition maps
+ this.partitionMap = TreeRangeMap.create();
+ this.reversePartitionMap = new HashMap<>();
+ this.nrPartitions = 0;
+
+ int size = in.readInt();
+ for (int i = 0; i < size; i++)
+ {
+ @SuppressWarnings("unchecked")
+ Range<BigInteger> range = (Range<BigInteger>) in.readObject();
+ int partitionId = in.readInt();
+
+ this.partitionMap.put(range, partitionId);
+ this.reversePartitionMap.put(partitionId, range);
+ if (partitionId >= this.nrPartitions)
+ {
+ this.nrPartitions = partitionId + 1;
+ }
+ }
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
index 8c0b5da1..a9373735 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
@@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.spark.bulkwriter.token.TokenUtils;
import org.apache.cassandra.spark.common.schema.ColumnType;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
public class Tokenizer implements Serializable
{
@@ -39,13 +38,12 @@ public class Tokenizer implements Serializable
private final TokenUtils tokenUtils;
private final SerializableFunction<Object[], Object[]> keyColumnProvider;
- public Tokenizer(BulkWriterContext writerContext)
+ public Tokenizer(BroadcastableTableSchema broadcastableTableSchema,
boolean isMurmur3Partitioner)
{
- TableSchema tableSchema = writerContext.schema().getTableSchema();
- this.tokenUtils = new TokenUtils(tableSchema.partitionKeyColumns,
- tableSchema.partitionKeyColumnTypes,
-
writerContext.cluster().getPartitioner() == Partitioner.Murmur3Partitioner);
- this.keyColumnProvider = tableSchema::getKeyColumns;
+ this.tokenUtils = new
TokenUtils(broadcastableTableSchema.getPartitionKeyColumns(),
+
broadcastableTableSchema.getPartitionKeyColumnTypes(),
+ isMurmur3Partitioner);
+ this.keyColumnProvider = broadcastableTableSchema::getKeyColumns;
}
@VisibleForTesting
@@ -54,7 +52,7 @@ public class Tokenizer implements Serializable
List<ColumnType<?>> partitionKeyColumnTypes,
boolean isMurmur3Partitioner)
{
- this.keyColumnProvider = (columns) ->
TableSchema.getKeyColumns(columns, keyColumnIndexes);
+ this.keyColumnProvider = (columns) ->
BroadcastableTableSchema.getKeyColumns(columns, keyColumnIndexes);
this.tokenUtils = new TokenUtils(partitionKeyColumns,
partitionKeyColumnTypes,
isMurmur3Partitioner);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
index 192c045e..90f14721 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.JobInfo;
-import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterSupport;
import org.jetbrains.annotations.Nullable;
public class CloudStorageDataTransferApiFactory
@@ -34,16 +34,18 @@ public class CloudStorageDataTransferApiFactory
/**
* Create CloudStorageDataTransferApi based on the actual runtime type of
ClusterInfo
- * @return CoordinatedCloudStorageDataTransferApi if using coordinated
write, i.e. with CassandraClusterInfoGroup;
+ * @return CoordinatedCloudStorageDataTransferApi if using coordinated
write, i.e. with MultiClusterSupport;
* otherwise, CloudStorageDataTransferApiImpl
*/
public CloudStorageDataTransferApi createDataTransferApi(StorageClient
storageClient,
JobInfo jobInfo,
ClusterInfo
clusterInfo)
{
- if (clusterInfo instanceof CassandraClusterInfoGroup)
+ if (clusterInfo instanceof MultiClusterSupport)
{
- return createForCoordinated(storageClient, jobInfo,
(CassandraClusterInfoGroup) clusterInfo);
+ @SuppressWarnings("unchecked")
+ MultiClusterSupport<ClusterInfo> multiCluster =
(MultiClusterSupport<ClusterInfo>) clusterInfo;
+ return createForCoordinated(storageClient, jobInfo, multiCluster);
}
else
{
@@ -64,7 +66,7 @@ public class CloudStorageDataTransferApiFactory
private CoordinatedCloudStorageDataTransferApi
createForCoordinated(StorageClient storageClient,
JobInfo jobInfo,
-
CassandraClusterInfoGroup clusterInfoGroup)
+
MultiClusterSupport<ClusterInfo> clusterInfoGroup)
{
Map<String, CloudStorageDataTransferApiImpl> apiByClusterId = new
HashMap<>(clusterInfoGroup.size());
clusterInfoGroup.forEach((clusterId, clusterInfo) -> {
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
index 4433d0da..09b07b38 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
@@ -23,13 +23,9 @@ import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class StorageClientConfig implements Serializable
{
private static final long serialVersionUID = -1572678388713210328L;
- private static final Logger LOGGER =
LoggerFactory.getLogger(StorageClientConfig.class);
public final String threadNamePrefix;
// Controls the max concurrency/parallelism of the thread pool used by s3
client
@@ -74,8 +70,7 @@ public class StorageClientConfig implements Serializable
}
catch (URISyntaxException e)
{
- LOGGER.error("{} is specified, but the value is invalid.
input={}", hint, uriString);
- throw new RuntimeException("Unable to resolve " + uriString, e);
+ throw new RuntimeException(hint + " is specified, but the value is
invalid. input=" + uriString, e);
}
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
index e536d196..9fd870cd 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
@@ -46,6 +46,8 @@ import
org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo;
import org.apache.cassandra.spark.bulkwriter.CassandraContext;
import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -59,17 +61,29 @@ import org.jetbrains.annotations.Nullable;
/**
* A group of ClusterInfo. One per cluster.
* The class does the aggregation over all clusters for applicable operations.
+ * <p>
+ * This class is NOT serialized and does NOT have a serialVersionUID.
+ * When broadcasting to executors, the driver extracts information from this
class
+ * and creates a {@link
org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup} instance,
+ * which is then included in the {@link
org.apache.cassandra.spark.bulkwriter.BulkWriterConfig}
+ * that gets broadcast.
+ * <p>
+ * This class implements Serializable only because the {@link
org.apache.cassandra.spark.bulkwriter.ClusterInfo}
+ * interface requires it (for use as a field type in broadcast classes), but
instances of this
+ * class are never directly serialized.
*/
public class CassandraClusterInfoGroup implements ClusterInfo,
MultiClusterSupport<ClusterInfo>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
- private static final long serialVersionUID = 5337884321245616172L;
-
// immutable
private final List<ClusterInfo> clusterInfos;
- private transient volatile Map<String, ClusterInfo> clusterInfoById;
- private transient volatile TokenRangeMapping<RingInstance>
consolidatedTokenRangeMapping;
+ private final String clusterId;
+ private volatile Map<String, ClusterInfo> clusterInfoById;
+ private volatile TokenRangeMapping<RingInstance>
consolidatedTokenRangeMapping;
+ // Pre-computed values from BroadcastableClusterInfoGroup (only set when
reconstructed on executors)
+ private Partitioner cachedPartitioner;
+ private String cachedLowestCassandraVersion;
/**
* Creates {@link CassandraClusterInfoGroup} with the list of {@link
ClusterInfo} from {@link BulkSparkConf} and validation
@@ -82,6 +96,20 @@ public class CassandraClusterInfoGroup implements
ClusterInfo, MultiClusterSuppo
return fromBulkSparkConf(conf, clusterId -> new
CassandraClusterInfo(conf, clusterId));
}
+ /**
+ * Reconstruct from BroadcastableClusterInfoGroup on executor.
+ * Creates CassandraClusterInfo instances for each cluster that will fetch
data from Sidecar.
+ * Leverages pre-computed values (partitioner, lowestCassandraVersion)
from the broadcastable
+ * to avoid re-validation and re-computation on executors.
+ *
+ * @param broadcastable the broadcastable cluster info group from broadcast
+ * @return new {@link CassandraClusterInfoGroup} instance
+ */
+ public static CassandraClusterInfoGroup from(BroadcastableClusterInfoGroup
broadcastable)
+ {
+ return new CassandraClusterInfoGroup(broadcastable);
+ }
+
/**
* Similar to {@link #fromBulkSparkConf(BulkSparkConf)} but takes
additional function to create {@link ClusterInfo}
*/
@@ -118,6 +146,30 @@ public class CassandraClusterInfoGroup implements
ClusterInfo, MultiClusterSuppo
{
this.clusterInfos = Collections.unmodifiableList(clusterInfos);
clusterInfoById();
+ this.clusterId = "ClusterInfoGroup: [" + String.join(", ",
applyOnEach(ClusterInfo::clusterId).values()) + ']';
+ }
+
+ /**
+ * Private constructor for executor-only reconstruction from broadcast
data.
+ * Accepts BroadcastableClusterInfoGroup and extracts pre-computed values
to avoid
+ * re-validation and re-computation on executors.
+ *
+ * @param broadcastable the broadcastable cluster info group from broadcast
+ */
+ private CassandraClusterInfoGroup(BroadcastableClusterInfoGroup
broadcastable)
+ {
+ // Build list of ClusterInfo from broadcastable data
+ List<ClusterInfo> clusterInfosList = new ArrayList<>();
+ broadcastable.forEach((clusterId, broadcastableInfo) -> {
+ clusterInfosList.add(new
CassandraClusterInfo((BroadcastableClusterInfo) broadcastableInfo));
+ });
+ this.clusterInfos = Collections.unmodifiableList(clusterInfosList);
+
+ // Extract pre-computed values from driver to avoid re-validation on
executors
+ this.cachedPartitioner = broadcastable.getPartitioner();
+ this.cachedLowestCassandraVersion =
broadcastable.getLowestCassandraVersion();
+ this.clusterId = broadcastable.clusterId();
+ clusterInfoById();
}
@Override
@@ -157,6 +209,12 @@ public class CassandraClusterInfoGroup implements
ClusterInfo, MultiClusterSuppo
@Override
public String getLowestCassandraVersion()
{
+ // Return cached value if available (executor-side reconstruction)
+ if (cachedLowestCassandraVersion != null)
+ {
+ return cachedLowestCassandraVersion;
+ }
+
if (clusterInfos.size() == 1)
{
return clusterInfos.get(0).getLowestCassandraVersion();
@@ -194,6 +252,12 @@ public class CassandraClusterInfoGroup implements
ClusterInfo, MultiClusterSuppo
@Override
public Partitioner getPartitioner()
{
+ // Return cached value if available (executor-side reconstruction)
+ if (cachedPartitioner != null)
+ {
+ return cachedPartitioner;
+ }
+
Map<String, Partitioner> aggregated =
applyOnEach(ClusterInfo::getPartitioner);
Set<Partitioner> partitioners = EnumSet.copyOf(aggregated.values());
if (partitioners.size() != 1)
@@ -248,7 +312,7 @@ public class CassandraClusterInfoGroup implements
ClusterInfo, MultiClusterSuppo
@Override
public String clusterId()
{
- return "ClusterInfoGroup: [" + String.join(", ",
applyOnEach(ClusterInfo::clusterId).values()) + ']';
+ return clusterId;
}
@Override
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
index 70fa59c5..5329fc21 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
@@ -26,24 +26,43 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.DataTransport;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
/**
- * BulkWriterContext for coordinated write
+ * BulkWriterContext for coordinated write to multiple clusters.
* The context requires the coordinated-write configuration to be present.
+ * <p>
+ * This class does NOT have a serialVersionUID because it is never directly
serialized.
+ * See {@link AbstractBulkWriterContext} for details on the serialization
architecture.
*/
public class CassandraCoordinatedBulkWriterContext extends
AbstractBulkWriterContext
{
- private static final long serialVersionUID = -2296507634642008675L;
-
public CassandraCoordinatedBulkWriterContext(@NotNull BulkSparkConf conf,
@NotNull StructType
structType,
int sparkDefaultParallelism)
{
super(conf, structType, sparkDefaultParallelism);
+ validateConfiguration(conf);
+ }
+
+ /**
+ * Constructor used by {@link
org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)}
factory method.
+ * This constructor is only used on executors to reconstruct context from
broadcast config.
+ *
+ * @param config immutable configuration for the bulk writer
+ */
+ public CassandraCoordinatedBulkWriterContext(@NotNull BulkWriterConfig
config)
+ {
+ super(config);
+ validateConfiguration(config.getConf());
+ }
+
+ private void validateConfiguration(BulkSparkConf conf)
+ {
Preconditions.checkArgument(conf.isCoordinatedWriteConfigured(),
"Cannot create
CassandraCoordinatedBulkWriterContext without CoordinatedWrite configuration");
// Redundant check, since isCoordinatedWriteConfigured implies using
S3_COMPAT mode already.
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
index 802844bd..92edf66b 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
@@ -27,7 +27,6 @@ import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
index 0b8ad44d..d7a46bfd 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
@@ -31,10 +31,10 @@ import com.google.common.collect.Range;
import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
import org.apache.cassandra.spark.bulkwriter.JobInfo;
-import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf.ClusterConf;
import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterSupport;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
@@ -98,9 +98,10 @@ public class MultiClusterReplicaAwareFailureHandler<I
extends CassandraInstance>
CoordinatedWriteConf coordinatedWriteConf = job.coordinatedWriteConf();
Preconditions.checkState(coordinatedWriteConf != null,
"CoordinatedWriteConf is absent for
multi-cluster write");
- Preconditions.checkState(cluster instanceof CassandraClusterInfoGroup,
- "Not a CassandraClusterInfoGroup for
multi-cluster write");
- CassandraClusterInfoGroup group = (CassandraClusterInfoGroup) cluster;
+ Preconditions.checkState(cluster instanceof MultiClusterSupport,
+ "Not a MultiClusterSupport for multi-cluster
write");
+ @SuppressWarnings("unchecked")
+ MultiClusterSupport<ClusterInfo> group =
(MultiClusterSupport<ClusterInfo>) cluster;
failureHandlers.forEach((clusterId, handler) -> {
ClusterConf clusterConf = coordinatedWriteConf.cluster(clusterId);
ClusterInfo clusterInfo = group.getValueOrThrow(clusterId);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
index 8ed6369d..b1c8f70a 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.spark.bulkwriter.util;
-import java.io.Serializable;
import java.util.Comparator;
import java.util.Set;
@@ -28,10 +27,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
-import org.apache.cassandra.spark.bulkwriter.CassandraBulkWriterContext;
+import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
import org.apache.cassandra.spark.bulkwriter.RingInstance;
import org.apache.cassandra.spark.bulkwriter.TokenPartitioner;
-import
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
import
org.apache.cassandra.spark.transports.storage.StorageAccessConfiguration;
import org.apache.cassandra.spark.transports.storage.StorageCredentials;
import
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -45,11 +43,23 @@ public class SbwKryoRegistrator implements KryoRegistrator
protected static final String KRYO_KEY = "spark.kryo.registrator";
// CHECKSTYLE IGNORE: Despite being static and final, this is a mutable
field not to be confused with a constant
- private static final Set<Class<? extends Serializable>>
javaSerializableClasses =
- Sets.newHashSet(CassandraBulkWriterContext.class,
- CassandraCoordinatedBulkWriterContext.class,
+ // Classes registered with Kryo using SbwJavaSerializer for Java
serialization.
+ // When Spark uses Kryo (spark.serializer=KryoSerializer), these classes
are serialized via
+ // Java's ObjectOutputStream instead of Kryo's default, applying to
shuffle, broadcast, RDD caching,
+ // and task closures.
+ //
+ // Only TOP-LEVEL classes need registration; nested Serializable fields
are handled recursively
+ // by Java's ObjectOutputStream:
+ //
+ // - BulkWriterConfig: The ONLY object broadcast to executors. Contains
all broadcastable wrappers
+ // and configuration classes, which are automatically
serialized recursively.
+ // - RingInstance: Serialized during shuffle and broadcast operations.
+ // - TokenPartitioner: Serialized during shuffle (e.g.,
repartitionAndSortWithinPartitions).
+ // Has custom writeObject/readObject that must be
invoked.
+ private static final Set<Class<?>> javaSerializableClasses =
+ Sets.newHashSet(RingInstance.class,
TokenPartitioner.class,
- RingInstance.class);
+ BulkWriterConfig.class);
@Override
public void registerClasses(@NotNull Kryo kryo)
@@ -65,7 +75,7 @@ public class SbwKryoRegistrator implements KryoRegistrator
kryo.register(StorageCredentials.class, new
StorageCredentials.Serializer());
}
- public static void addJavaSerializableClass(@NotNull Class<? extends
Serializable> javaSerializableClass)
+ public static void addJavaSerializableClass(@NotNull Class<?>
javaSerializableClass)
{
javaSerializableClasses.add(javaSerializableClass);
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index b92c1b01..ab0e663b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -140,7 +140,7 @@ class BulkSparkConfTest
// mTLS is now required, and the BulkSparkConf constructor fails if
the options aren't present
Map<String, String> options = copyDefaultOptions();
SparkConf sparkConf = new SparkConf();
- assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf,
options));
+ assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf,
options, null));
}
@Test
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
index 8497f81d..5ed4c522 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
@@ -81,7 +81,7 @@ public class CassandraClusterInfoTest
MockClusterInfoForTimeSkew(int allowanceMinutes, Instant remoteNow)
{
- super(null);
+ super((BulkSparkConf) null);
mockCassandraContext(allowanceMinutes, remoteNow);
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 5578e95e..8a2c1728 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -114,7 +114,9 @@ class RecordWriterTest
writerContext.setReplicationFactor(rf);
tc = new TestTaskContext();
range =
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
- tokenizer = new Tokenizer(writerContext);
+ BroadcastableTableSchema broadcastableTableSchema =
BroadcastableTableSchema.from(writerContext.schema().getTableSchema());
+ boolean isMurmur3Partitioner =
writerContext.cluster().getPartitioner() ==
org.apache.cassandra.spark.data.partitioner.Partitioner.Murmur3Partitioner;
+ tokenizer = new Tokenizer(broadcastableTableSchema,
isMurmur3Partitioner);
}
@Test
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
index 1cfed87f..1e9b7ef8 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
@@ -367,9 +367,10 @@ public class TableSchemaNormalizeTest
String[] fieldNames = {field};
ColumnType<?>[] cqlTypes = {columnType};
TableSchema schema = buildSchema(cassandraVersion, fieldNames,
sparkTypes, new CqlField.CqlType[]{cqlType}, fieldNames, cqlTypes, fieldNames);
+ BroadcastableTableSchema broadcastable =
BroadcastableTableSchema.from(schema);
Object[] source = new Object[]{sourceVal};
Object[] expected = new Object[]{expectedVal};
- Object[] normalized = schema.normalize(source);
+ Object[] normalized = broadcastable.normalize(source);
assertThat(normalized, is(equalTo(expected)));
}
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 88163df0..257034bf 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -183,11 +183,11 @@ public class TableSchemaTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
public void normalizeConvertsValidTable(String cassandraVersion)
{
- TableSchema schema = getValidSchemaBuilder(cassandraVersion)
- .build();
+ TableSchema schema = getValidSchemaBuilder(cassandraVersion).build();
+ BroadcastableTableSchema broadcastable =
BroadcastableTableSchema.from(schema);
- assertThat(schema.normalize(new Object[]{1, 1L, "foo", 2}))
- .isEqualTo(new Object[]{1, -2147483648, "foo", 2});
+ assertThat(broadcastable.normalize(new Object[]{1, 1L, "foo", 2}))
+ .isEqualTo(new Object[]{1, -2147483648, "foo", 2});
}
@ParameterizedTest
@@ -213,16 +213,17 @@ public class TableSchemaTest
public void testGetKeyColumnsFindsCorrectValues(String cassandraVersion)
{
StructType outOfOrderDataFrameSchema = new StructType()
- .add("date", DataTypes.TimestampType)
- .add("id", DataTypes.IntegerType)
- .add("course", DataTypes.StringType)
- .add("marks", DataTypes.IntegerType);
+ .add("date",
DataTypes.TimestampType)
+ .add("id",
DataTypes.IntegerType)
+ .add("course",
DataTypes.StringType)
+ .add("marks",
DataTypes.IntegerType);
TableSchema schema = getValidSchemaBuilder(cassandraVersion)
- .withDataFrameSchema(outOfOrderDataFrameSchema)
- .build();
- assertThat(schema.getKeyColumns(new Object[]{"date_val", "id_val",
"course_val", "marks_val"}))
- .isEqualTo(new Object[]{"id_val", "date_val"});
+ .withDataFrameSchema(outOfOrderDataFrameSchema)
+ .build();
+ BroadcastableTableSchema broadcastable =
BroadcastableTableSchema.from(schema);
+ assertThat(broadcastable.getKeyColumns(new Object[]{"date_val",
"id_val", "course_val", "marks_val"}))
+ .isEqualTo(new Object[]{"id_val", "date_val"});
}
@ParameterizedTest
@@ -230,10 +231,11 @@ public class TableSchemaTest
public void testGetKeyColumnsFailsWhenNullKeyValues(String
cassandraVersion)
{
TableSchema schema = getValidSchemaBuilder(cassandraVersion)
- .build();
- assertThatThrownBy(() -> schema.getKeyColumns(new Object[]{"foo",
null, "baz", "boo"}))
- .isInstanceOf(NullPointerException.class)
- .hasMessage("Found a null primary or composite key column in
source data. All key columns must be non-null.");
+ .build();
+ BroadcastableTableSchema broadcastable =
BroadcastableTableSchema.from(schema);
+ assertThatThrownBy(() -> broadcastable.getKeyColumns(new
Object[]{"foo", null, "baz", "boo"}))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Found a null primary or composite key column in source
data. All key columns must be non-null.");
}
@ParameterizedTest
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
index 26b3ae17..4825c8de 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Range;
import org.junit.jupiter.api.Test;
import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo;
import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfoTest;
@@ -272,20 +273,49 @@ class CassandraClusterInfoGroupTest
@Test
void testSerDeser()
{
- CassandraClusterInfoGroup origin = mockClusterGroup(2, index ->
mockClusterInfo("cluster" + index));
- assertThat(origin.clusterInfoByIdUnsafe()).isNotNull();
- assertThat(origin.getValueOrNull("cluster0")).isNotNull();
- assertThat(origin.getValueOrNull("cluster1")).isNotNull();
- byte[] serialized = SerializationUtils.serialize(origin);
- CassandraClusterInfoGroup target =
SerializationUtils.deserialize(serialized, CassandraClusterInfoGroup.class);
- assertThat(target.clusterInfoByIdUnsafe())
- .describedAs("clusterInfoById should be null in the deserialized
object")
- .isNull();
- assertThat(target.getValueOrNull("cluster0")).isNotNull();
- assertThat(target.getValueOrNull("cluster1")).isNotNull();
- assertThat(target.clusterInfoByIdUnsafe())
- .describedAs("clusterInfoById should now be lazily initialized")
- .isNotNull();
+ // Create a CassandraClusterInfoGroup with some test data
+ CassandraClusterInfoGroup originalGroup = mockClusterGroup(2, index ->
{
+ CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" +
index);
+
when(clusterInfo.getPartitioner()).thenReturn(Partitioner.Murmur3Partitioner);
+ when(clusterInfo.getLowestCassandraVersion()).thenReturn("4.0.0");
+ return clusterInfo;
+ });
+
+ // Mock the BulkSparkConf needed for serialization
+ BulkSparkConf conf = mock(BulkSparkConf.class,
withSettings().serializable());
+
+ // Convert to BroadcastableClusterInfoGroup (what actually gets
serialized/broadcast in Spark)
+ BroadcastableClusterInfoGroup broadcastable =
BroadcastableClusterInfoGroup.from(originalGroup, conf);
+
+ // Serialize and deserialize using SerializationUtils
+ byte[] serializedBytes = SerializationUtils.serialize(broadcastable);
+ BroadcastableClusterInfoGroup deserializedBroadcastable =
SerializationUtils.deserialize(serializedBytes,
BroadcastableClusterInfoGroup.class);
+
+ // Verify the deserialized broadcastable has the correct pre-computed
values
+ assertThat(deserializedBroadcastable.clusterId())
+ .describedAs("ClusterId should be preserved after serialization")
+ .isEqualTo(originalGroup.clusterId());
+
+ assertThat(deserializedBroadcastable.getPartitioner())
+ .describedAs("Partitioner should be preserved after serialization")
+ .isEqualTo(Partitioner.Murmur3Partitioner);
+
+ assertThat(deserializedBroadcastable.getLowestCassandraVersion())
+ .describedAs("Lowest Cassandra version should be preserved after
serialization")
+ .isEqualTo("4.0.0");
+
+ assertThat(deserializedBroadcastable.size())
+ .describedAs("Number of clusters should be preserved after
serialization")
+ .isEqualTo(2);
+
+ // Verify we can iterate over the deserialized clusters
+ int[] clusterCount = {0};
+ deserializedBroadcastable.forEach((clusterId, clusterInfo) -> {
+ assertThat(clusterId).isIn("cluster0", "cluster1");
+ assertThat(clusterInfo).isNotNull();
+ clusterCount[0]++;
+ });
+ assertThat(clusterCount[0]).isEqualTo(2);
}
private CassandraClusterInfoGroup mockClusterGroup(int size,
@@ -297,8 +327,7 @@ class CassandraClusterInfoGroupTest
private CassandraClusterInfo mockClusterInfo(String clusterId)
{
- CassandraClusterInfo clusterInfo = mock(CassandraClusterInfo.class,
-
withSettings().serializable()); // serializable required by testSerDeser
+ CassandraClusterInfo clusterInfo = mock(CassandraClusterInfo.class);
when(clusterInfo.clusterId()).thenReturn(clusterId);
return clusterInfo;
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
index 99e3b8ba..780390e3 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
@@ -143,13 +143,15 @@ class CoordinatedImportCoordinatorTest
coordinator.onStageReady(jobId);
assertThat(coordinator.isStageReady()).isTrue();
- loopAssert(() -> stagedClusters.getAllValues().size() == 2, "waiting
for all cluster to stage successfully");
+ loopAssert(() -> stagedClusters.getAllValues().size() == 2,
+ "waiting for all cluster to stage successfully. actual: " +
stagedClusters.getAllValues());
assertThat(stagedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
clusterId2);
// signal apply read
coordinator.onImportReady(jobId);
- loopAssert(() -> appliedClusters.getAllValues().size() == 2, "waiting
for all cluster to import successfully");
+ loopAssert(() -> appliedClusters.getAllValues().size() == 2,
+ "waiting for all cluster to import successfully. actual: "
+ appliedClusters.getAllValues());
assertThat(appliedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
clusterId2);
fut.get();
@@ -292,10 +294,10 @@ class CoordinatedImportCoordinatorTest
.build();
}
- // loop at most 10 times until the condition is evaluated to true
+ // loop at most 50 times until the condition is evaluated to true
private void loopAssert(Supplier<Boolean> condition, String desc)
{
- int attempts = 10;
+ int attempts = 50;
while (!condition.get() && attempts-- > 1)
{
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
diff --git
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 06706af5..cee37d32 100644
---
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -537,8 +537,7 @@ public final class ReaderUtils extends TokenUtils
* Read primary Index.db file
*
* @param primaryIndex input stream for Index.db file
- * @param tracker tracker that consumes each key byffer and returns
true if can exit early, otherwise continues to read primary index
- * @return pair of first and last decorated keys
+ * @param tracker tracker that consumes each key buffer and returns
true if can exit early, otherwise continues to read primary index
* @throws IOException
*/
public static void readPrimaryIndex(@NotNull InputStream primaryIndex,
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 3c84591d..91fc5a7d 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -492,8 +492,7 @@ public final class ReaderUtils extends TokenUtils
* Read primary Index.db file
*
* @param primaryIndex input stream for Index.db file
- * @param tracker tracker that consumes each key byffer and returns
true if can exit early, otherwise continues to read primary index
- * @return pair of first and last decorated keys
+ * @param tracker tracker that consumes each key buffer and returns
true if can exit early, otherwise continues to read primary index
* @throws IOException
*/
public static void readPrimaryIndex(@NotNull InputStream primaryIndex,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]