This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f960685e CASSANALYTICS-89: Create dedicated data class for broadcast 
variable during bulk write (#146)
f960685e is described below

commit f960685e0ee9fafc0e4819ff40a77179f0f7b859
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Oct 28 16:59:56 2025 -0700

    CASSANALYTICS-89: Create dedicated data class for broadcast variable during 
bulk write (#146)
    
    Extract BulkWriterConfig as an immutable, serializable data class for
    Spark broadcast variables, replacing direct broadcasting of 
BulkWriterContext.
    This addresses the issue where BulkWriterContext, containing mutable state
    and non-serializable dependencies, was being broadcast to executors
    Follows Spark best practices for broadcast variables (immutable data only) 
and
    provides a clear separation between configuration data and stateful context 
objects
    
    Patch by Yifan Cai; reviewed by Francisco Guerrero for CASSANALYTICS-89
---
 CHANGES.txt                                        |   1 +
 .../bulkwriter/AbstractBulkWriterContext.java      | 102 +++++++++-
 .../spark/bulkwriter/BroadcastableClusterInfo.java | 104 +++++++++++
 .../bulkwriter/BroadcastableClusterInfoGroup.java  | 150 +++++++++++++++
 .../spark/bulkwriter/BroadcastableJobInfo.java     | 110 +++++++++++
 .../spark/bulkwriter/BroadcastableSchemaInfo.java  |  83 ++++++++
 .../spark/bulkwriter/BroadcastableTableSchema.java | 208 +++++++++++++++++++++
 .../bulkwriter/BroadcastableTokenPartitioner.java  |  93 +++++++++
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  37 ++--
 .../spark/bulkwriter/BulkWriterConfig.java         | 114 +++++++++++
 .../spark/bulkwriter/BulkWriterContext.java        |  37 +++-
 .../spark/bulkwriter/BulkWriterContextFactory.java |   2 +-
 .../bulkwriter/CassandraBulkSourceRelation.java    |  92 +++++++--
 .../bulkwriter/CassandraBulkWriterContext.java     |  19 +-
 .../spark/bulkwriter/CassandraClusterInfo.java     |  51 ++++-
 .../spark/bulkwriter/CassandraJobInfo.java         |  16 +-
 .../spark/bulkwriter/CassandraSchemaInfo.java      |  13 +-
 .../cassandra/spark/bulkwriter/ClusterInfo.java    |  16 +-
 .../bulkwriter/IBroadcastableClusterInfo.java      |  83 ++++++++
 .../apache/cassandra/spark/bulkwriter/JobInfo.java |  10 +-
 .../cassandra/spark/bulkwriter/RecordWriter.java   |  11 +-
 .../cassandra/spark/bulkwriter/SchemaInfo.java     |  10 +-
 .../cassandra/spark/bulkwriter/TableSchema.java    |  73 ++++----
 .../spark/bulkwriter/TokenPartitioner.java         | 172 +++++++++++++----
 .../cassandra/spark/bulkwriter/Tokenizer.java      |  14 +-
 .../CloudStorageDataTransferApiFactory.java        |  12 +-
 .../cloudstorage/StorageClientConfig.java          |   7 +-
 .../coordinated/CassandraClusterInfoGroup.java     |  74 +++++++-
 .../CassandraCoordinatedBulkWriterContext.java     |  25 ++-
 .../coordinated/CoordinatedWriteConf.java          |   1 -
 .../MultiClusterReplicaAwareFailureHandler.java    |   9 +-
 .../spark/bulkwriter/util/SbwKryoRegistrator.java  |  26 ++-
 .../spark/bulkwriter/BulkSparkConfTest.java        |   2 +-
 .../spark/bulkwriter/CassandraClusterInfoTest.java |   2 +-
 .../spark/bulkwriter/RecordWriterTest.java         |   4 +-
 .../spark/bulkwriter/TableSchemaNormalizeTest.java |   3 +-
 .../spark/bulkwriter/TableSchemaTest.java          |  34 ++--
 .../coordinated/CassandraClusterInfoGroupTest.java |  61 ++++--
 .../CoordinatedImportCoordinatorTest.java          |  10 +-
 .../apache/cassandra/spark/reader/ReaderUtils.java |   3 +-
 .../apache/cassandra/spark/reader/ReaderUtils.java |   3 +-
 41 files changed, 1690 insertions(+), 207 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7ce881e4..04956e51 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Refactor BulkWriterContext broadcasting to use immutable config class 
(CASSANALYTICS-89)
  * Bump sidecar dependency to 0.2.0 (CASSANALYTICS-93)
  * Support for Trie-Indexed SSTables (BTI format) (CASSANALYTICS-27)
  * Add extractCdcTables method to CqlUtils (CASSANALYTICS-91)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index 87e5a979..6acbb2fc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -32,6 +32,7 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
@@ -44,9 +45,29 @@ import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
+/**
+ * Abstract base class for BulkWriterContext implementations.
+ *
+ * <p>Serialization Architecture:</p>
+ * <p>This class is NOT serialized directly. Instead:
+ * <ol>
+ *   <li>Driver creates BulkWriterContext using constructor</li>
+ *   <li>Driver extracts BulkWriterConfig in {@link 
CassandraBulkSourceRelation} constructor</li>
+ *   <li>BulkWriterConfig gets broadcast to executors</li>
+ *   <li>Executors reconstruct BulkWriterContext via {@link 
BulkWriterContext#from(BulkWriterConfig)}</li>
+ * </ol>
+ *
+ * <p>Broadcastable wrappers used in BulkWriterConfig:
+ * <ul>
+ *   <li>{@link IBroadcastableClusterInfo} → reconstructs to {@link 
CassandraClusterInfo} or {@link CassandraClusterInfoGroup}</li>
+ *   <li>{@link BroadcastableJobInfo} → reconstructs to {@link 
CassandraJobInfo}</li>
+ *   <li>{@link BroadcastableSchemaInfo} → reconstructs to {@link 
CassandraSchemaInfo}</li>
+ * </ul>
+ *
+ * <p>Implements KryoSerializable with fail-fast approach to detect missing 
Kryo registration.
+ */
 public abstract class AbstractBulkWriterContext implements BulkWriterContext, 
KryoSerializable
 {
-    private static final long serialVersionUID = -6526396615116954510L;
     // log as the concrete implementation; but use private to not expose the 
logger to implementations
     private final transient Logger logger = 
LoggerFactory.getLogger(this.getClass());
 
@@ -62,13 +83,22 @@ public abstract class AbstractBulkWriterContext implements 
BulkWriterContext, Kr
     private transient volatile JobStatsPublisher jobStatsPublisher;
     private transient volatile TransportContext transportContext;
 
+    /**
+     * Constructor for driver usage.
+     * Builds all components fresh on the driver.
+     *
+     * @param conf                    Bulk Spark configuration
+     * @param structType              DataFrame schema
+     * @param sparkDefaultParallelism Spark default parallelism
+     */
     protected AbstractBulkWriterContext(@NotNull BulkSparkConf conf,
                                         @NotNull StructType structType,
                                         @NotNull int sparkDefaultParallelism)
     {
         this.conf = conf;
         this.sparkDefaultParallelism = sparkDefaultParallelism;
-        // Note: build sequence matters
+
+        // Build everything fresh on driver
         this.clusterInfo = buildClusterInfo();
         this.clusterInfo.startupValidate();
         this.lowestCassandraVersion = findLowestCassandraVersion();
@@ -76,10 +106,32 @@ public abstract class AbstractBulkWriterContext implements 
BulkWriterContext, Kr
         this.jobInfo = buildJobInfo();
         this.schemaInfo = buildSchemaInfo(structType);
         this.jobStatsPublisher = buildJobStatsPublisher();
-        this.transportContext = buildTransportContext(true);
+        this.transportContext = buildTransportContext(true);  // isOnDriver = 
true
+    }
+
+    /**
+     * Constructor for executor usage.
+     * Reconstructs components from broadcast configuration on executors.
+     * This is used by the factory method {@link 
BulkWriterContext#from(BulkWriterConfig)}.
+     *
+     * @param config immutable configuration for the bulk writer with 
pre-computed values
+     */
+    protected AbstractBulkWriterContext(@NotNull BulkWriterConfig config)
+    {
+        this.conf = config.getConf();
+        this.sparkDefaultParallelism = config.getSparkDefaultParallelism();
+
+        // Reconstruct from broadcast data on executor
+        this.clusterInfo = 
reconstructClusterInfoOnExecutor(config.getBroadcastableClusterInfo());
+        this.lowestCassandraVersion = config.getLowestCassandraVersion();
+        this.bridge = buildCassandraBridge();
+        this.jobInfo = 
reconstructJobInfoOnExecutor(config.getBroadcastableJobInfo());
+        this.schemaInfo = 
reconstructSchemaInfoOnExecutor(config.getBroadcastableSchemaInfo());
+        this.jobStatsPublisher = buildJobStatsPublisher();
+        this.transportContext = buildTransportContext(false);  // isOnDriver = 
false
     }
 
-    protected final BulkSparkConf bulkSparkConf()
+    public final BulkSparkConf bulkSparkConf()
     {
         return conf;
     }
@@ -98,6 +150,48 @@ public abstract class AbstractBulkWriterContext implements 
BulkWriterContext, Kr
 
     protected abstract ClusterInfo buildClusterInfo();
 
+    /**
+     * Reconstructs ClusterInfo on executors from broadcastable versions.
+     * This method is only called on executors when reconstructing 
BulkWriterContext from
+     * broadcast BulkWriterConfig. Each broadcastable type knows how to 
reconstruct itself
+     * into the appropriate full ClusterInfo implementation.
+     *
+     * @param clusterInfo the BroadcastableClusterInfo from broadcast
+     * @return reconstructed ClusterInfo (CassandraClusterInfo or 
CassandraClusterInfoGroup)
+     */
+    protected ClusterInfo 
reconstructClusterInfoOnExecutor(IBroadcastableClusterInfo clusterInfo)
+    {
+        return clusterInfo.reconstruct();
+    }
+
+    /**
+     * Reconstructs JobInfo on executors from BroadcastableJobInfo.
+     * This method is only called on executors when reconstructing 
BulkWriterContext from
+     * broadcast BulkWriterConfig. It rebuilds CassandraJobInfo with 
TokenPartitioner reconstructed
+     * from the broadcastable partition mappings.
+     *
+     * @param jobInfo the BroadcastableJobInfo from broadcast
+     * @return reconstructed CassandraJobInfo
+     */
+    protected JobInfo reconstructJobInfoOnExecutor(BroadcastableJobInfo 
jobInfo)
+    {
+        return new CassandraJobInfo(jobInfo);
+    }
+
+    /**
+     * Reconstructs SchemaInfo on executors from BroadcastableSchemaInfo.
+     * This method is only called on executors when reconstructing 
BulkWriterContext from
+     * broadcast BulkWriterConfig. It reconstructs CassandraSchemaInfo and 
TableSchema from
+     * the broadcast data (no Sidecar calls needed).
+     *
+     * @param schemaInfo the BroadcastableSchemaInfo from broadcast
+     * @return reconstructed CassandraSchemaInfo
+     */
+    protected SchemaInfo 
reconstructSchemaInfoOnExecutor(BroadcastableSchemaInfo schemaInfo)
+    {
+        return new CassandraSchemaInfo(schemaInfo);
+    }
+
     protected abstract void validateKeyspaceReplication();
 
     protected JobInfo buildJobInfo()
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfo.java
new file mode 100644
index 00000000..de06151e
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Broadcastable wrapper for single cluster with ZERO transient fields to 
optimize Spark broadcasting.
+ * <p>
+ * Only essential fields are broadcast; executors reconstruct 
CassandraClusterInfo to fetch other data from Sidecar.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to 
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type 
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs 
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ *   <li>Minimize SizeEstimator reflection overhead during broadcast 
preparation</li>
+ *   <li>Reduce broadcast variable serialization size</li>
+ *   <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableClusterInfo implements 
IBroadcastableClusterInfo
+{
+    private static final long serialVersionUID = 4506917240637924802L;
+
+    // Essential fields broadcast to executors
+    private final Partitioner partitioner;
+    private final String cassandraVersion;
+    private final String clusterId;
+    private final BulkSparkConf conf;
+
+    /**
+     * Creates a BroadcastableCluster from a CassandraClusterInfo by 
extracting essential fields.
+     * Executors will reconstruct CassandraClusterInfo to fetch other data 
from Sidecar.
+     *
+     * @param source the source ClusterInfo (typically CassandraClusterInfo)
+     * @param conf   the BulkSparkConf needed to connect to Sidecar on 
executors
+     */
+    public static BroadcastableClusterInfo from(@NotNull ClusterInfo source, 
@NotNull BulkSparkConf conf)
+    {
+        return new BroadcastableClusterInfo(source.getPartitioner(), 
source.getLowestCassandraVersion(), source.clusterId(), conf);
+    }
+
+    private BroadcastableClusterInfo(@NotNull Partitioner partitioner,
+                                     @NotNull String cassandraVersion,
+                                     @Nullable String clusterId,
+                                     @NotNull BulkSparkConf conf)
+    {
+        this.partitioner = partitioner;
+        this.cassandraVersion = cassandraVersion;
+        this.clusterId = clusterId;
+        this.conf = conf;
+    }
+
+    public BulkSparkConf getConf()
+    {
+        return conf;
+    }
+
+    @Override
+    public String getLowestCassandraVersion()
+    {
+        return cassandraVersion;
+    }
+
+    @Override
+    public Partitioner getPartitioner()
+    {
+        return partitioner;
+    }
+
+    @Override
+    @Nullable
+    public String clusterId()
+    {
+        return clusterId;
+    }
+
+    @Override
+    public ClusterInfo reconstruct()
+    {
+        return new CassandraClusterInfo(this);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfoGroup.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfoGroup.java
new file mode 100644
index 00000000..d5ad1e13
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableClusterInfoGroup.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterSupport;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Broadcastable wrapper for coordinated writes with ZERO transient fields to 
optimize Spark broadcasting.
+ * <p>
+ * This class wraps multiple BroadcastableCluster instances for multi-cluster 
scenarios.
+ * Pre-computed values (partitioner, lowestCassandraVersion) are extracted 
from CassandraClusterInfoGroup on the driver
+ * to avoid duplicating aggregation/validation logic on executors.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to 
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type 
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs 
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ *   <li>Minimize SizeEstimator reflection overhead during broadcast 
preparation</li>
+ *   <li>Reduce broadcast variable serialization size</li>
+ *   <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableClusterInfoGroup implements 
IBroadcastableClusterInfo, MultiClusterSupport<IBroadcastableClusterInfo>
+{
+    private static final long serialVersionUID = 8621255452042082506L;
+
+    private final List<BroadcastableClusterInfo> clusterInfos;
+    private final String clusterId;
+    private final BulkSparkConf conf;
+    private final Partitioner partitioner;
+    private final String lowestCassandraVersion;
+
+    /**
+     * Creates a BroadcastableClusterInfoGroup from a source ClusterInfo group.
+     * Extracts pre-computed values (partitioner, lowestCassandraVersion) from 
the source
+     * to avoid duplicating aggregation/validation logic on executors.
+     *
+     * @param source the source CassandraClusterInfoGroup
+     * @param conf   the BulkSparkConf needed to connect to Sidecar on 
executors
+     */
+    public static BroadcastableClusterInfoGroup from(@NotNull 
CassandraClusterInfoGroup source,
+                                                     @NotNull BulkSparkConf 
conf)
+    {
+        List<BroadcastableClusterInfo> broadcastableInfos = new ArrayList<>();
+        source.forEach((clusterId, clusterInfo) -> 
broadcastableInfos.add(BroadcastableClusterInfo.from(clusterInfo, conf)));
+
+        // Extract pre-computed values from CassandraClusterInfoGroup
+        // These have already been validated/computed on the driver
+        Partitioner partitioner = source.getPartitioner();
+        String lowestVersion = source.getLowestCassandraVersion();
+
+        return new BroadcastableClusterInfoGroup(broadcastableInfos, 
source.clusterId(), conf, partitioner, lowestVersion);
+    }
+
+    private BroadcastableClusterInfoGroup(List<BroadcastableClusterInfo> 
clusterInfos,
+                                          String clusterId,
+                                          BulkSparkConf conf,
+                                          Partitioner partitioner,
+                                          String lowestCassandraVersion)
+    {
+        this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+        this.conf = conf;
+        this.clusterId = clusterId;
+        this.partitioner = partitioner;
+        this.lowestCassandraVersion = lowestCassandraVersion;
+    }
+
+    @Override
+    @NotNull
+    public BulkSparkConf getConf()
+    {
+        return conf;
+    }
+
+    @Override
+    public String getLowestCassandraVersion()
+    {
+        // Return pre-computed value from CassandraClusterInfoGroup
+        // No need to duplicate aggregation/validation logic
+        return lowestCassandraVersion;
+    }
+
+    @Override
+    public Partitioner getPartitioner()
+    {
+        // Return pre-computed value from CassandraClusterInfoGroup
+        // No need to duplicate validation logic
+        return partitioner;
+    }
+
+    @Override
+    public String clusterId()
+    {
+        return clusterId;
+    }
+
+    // MultiClusterSupport methods
+    @Override
+    public int size()
+    {
+        return clusterInfos.size();
+    }
+
+    @Override
+    public void forEach(BiConsumer<String, IBroadcastableClusterInfo> action)
+    {
+        clusterInfos.forEach(info -> action.accept(info.clusterId(), info));
+    }
+
+    @Nullable
+    @Override
+    public IBroadcastableClusterInfo getValueOrNull(@NotNull String clusterId)
+    {
+        throw new UnsupportedOperationException("getValueOrNull should not be 
called from BroadcastableClusterInfoGroup");
+    }
+
+    @Override
+    public ClusterInfo reconstruct()
+    {
+        return CassandraClusterInfoGroup.from(this);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableJobInfo.java
new file mode 100644
index 00000000..b879d1b9
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableJobInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.UUID;
+
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.Serializable;
+
+/**
+ * Broadcastable wrapper for job information with ZERO transient fields to 
optimize Spark broadcasting.
+ * <p>
+ * Only essential fields are broadcast; executors reconstruct CassandraJobInfo 
to rebuild TokenPartitioner.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to 
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type 
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs 
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ *   <li>Minimize SizeEstimator reflection overhead during broadcast 
preparation</li>
+ *   <li>Reduce broadcast variable serialization size</li>
+ *   <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableJobInfo implements Serializable
+{
+    private static final long serialVersionUID = -8717074052066841748L;
+
+    // Essential fields broadcast to executors
+    private final BulkSparkConf conf;
+    private final MultiClusterContainer<UUID> restoreJobIds;
+    private final BroadcastableTokenPartitioner tokenPartitioner;  // 
Broadcastable version without Logger
+
+    /**
+     * Creates a BroadcastableJobInfo from a source JobInfo.
+     * Extracts partition mappings from TokenPartitioner to avoid broadcasting 
Logger.
+     *
+     * @param source the source JobInfo (typically CassandraJobInfo)
+     * @param conf   the BulkSparkConf needed for executors
+     */
+    public static BroadcastableJobInfo from(@NotNull JobInfo source, @NotNull 
BulkSparkConf conf)
+    {
+        // Extract restoreJobIds from source
+        MultiClusterContainer<UUID> restoreJobIds;
+        if (source.isCoordinatedWriteEnabled())
+        {
+            // For coordinated write, need to extract all restoreJobIds
+            CoordinatedWriteConf coordinatedConf = 
source.coordinatedWriteConf();
+            restoreJobIds = new MultiClusterContainer<>();
+            coordinatedConf.clusters().keySet().forEach(clusterId -> {
+                restoreJobIds.setValue(clusterId, 
source.getRestoreJobId(clusterId));
+            });
+        }
+        else
+        {
+            // Single cluster - use null key
+            restoreJobIds = 
MultiClusterContainer.ofSingle(source.getRestoreJobId());
+        }
+
+        // Extract partition mappings from TokenPartitioner
+        BroadcastableTokenPartitioner broadcastableTokenPartitioner = 
BroadcastableTokenPartitioner.from(source.getTokenPartitioner());
+
+        return new BroadcastableJobInfo(conf, restoreJobIds, 
broadcastableTokenPartitioner);
+    }
+
+    private BroadcastableJobInfo(BulkSparkConf conf,
+                                MultiClusterContainer<UUID> restoreJobIds,
+                                BroadcastableTokenPartitioner tokenPartitioner)
+    {
+        this.conf = conf;
+        this.restoreJobIds = restoreJobIds;
+        this.tokenPartitioner = tokenPartitioner;
+    }
+
+    public BulkSparkConf getConf()
+    {
+        return conf;
+    }
+
+    public MultiClusterContainer<UUID> getRestoreJobIds()
+    {
+        return restoreJobIds;
+    }
+
+    public BroadcastableTokenPartitioner getBroadcastableTokenPartitioner()
+    {
+        return tokenPartitioner;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableSchemaInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableSchemaInfo.java
new file mode 100644
index 00000000..9c0a164b
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableSchemaInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Broadcastable wrapper for schema information with ZERO transient fields to 
optimize Spark broadcasting.
+ * <p>
+ * Contains BroadcastableTableSchema (pre-computed schema data) and UDT 
statements.
+ * Executors reconstruct CassandraSchemaInfo and TableSchema from these fields.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to 
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type 
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs 
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ *   <li>Minimize SizeEstimator reflection overhead during broadcast 
preparation</li>
+ *   <li>Reduce broadcast variable serialization size</li>
+ *   <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableSchemaInfo implements Serializable
+{
+    private static final long serialVersionUID = -8727074052066841748L;
+
+    // Essential fields broadcast to executors
+    private final BroadcastableTableSchema broadcastableTableSchema;
+    private final Set<String> userDefinedTypeStatements;
+
+    /**
+     * Creates a BroadcastableSchemaInfo from a source SchemaInfo.
+     * Extracts BroadcastableTableSchema to avoid serializing Logger.
+     *
+     * @param source the source SchemaInfo (typically CassandraSchemaInfo)
+     */
+    public static BroadcastableSchemaInfo from(@NotNull SchemaInfo source)
+    {
+        return new BroadcastableSchemaInfo(
+            BroadcastableTableSchema.from(source.getTableSchema()),
+            source.getUserDefinedTypeStatements()
+        );
+    }
+
+    private BroadcastableSchemaInfo(BroadcastableTableSchema 
broadcastableTableSchema,
+                                   Set<String> userDefinedTypeStatements)
+    {
+        this.broadcastableTableSchema = broadcastableTableSchema;
+        this.userDefinedTypeStatements = userDefinedTypeStatements;
+    }
+
+    public BroadcastableTableSchema getBroadcastableTableSchema()
+    {
+        return broadcastableTableSchema;
+    }
+
+    @NotNull
+    public Set<String> getUserDefinedTypeStatements()
+    {
+        return userDefinedTypeStatements;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java
new file mode 100644
index 00000000..322e815d
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTableSchema.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Broadcastable wrapper for TableSchema with ZERO transient fields to 
optimize Spark broadcasting.
+ * <p>
+ * Contains all essential fields from TableSchema needed on executors, but 
without the Logger reference.
+ * Executors will reconstruct TableSchema from these fields.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to 
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type 
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs 
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ *   <li>Minimize SizeEstimator reflection overhead during broadcast 
preparation</li>
+ *   <li>Reduce broadcast variable serialization size</li>
+ *   <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableTableSchema implements Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    // All fields from TableSchema needed for reconstruction on executors
+    private final String createStatement;
+    private final String modificationStatement;
+    private final List<String> partitionKeyColumns;
+    private final List<ColumnType<?>> partitionKeyColumnTypes;
+    private final List<SqlToCqlTypeConverter.Converter<?>> converters;
+    private final List<Integer> keyFieldPositions;
+    private final WriteMode writeMode;
+    private final TTLOption ttlOption;
+    private final TimestampOption timestampOption;
+    private final String lowestCassandraVersion;
+    private final boolean quoteIdentifiers;
+
+    /**
+     * Creates a BroadcastableTableSchema from a source TableSchema.
+     * Extracts all essential fields but excludes the Logger.
+     *
+     * @param source the source TableSchema (driver-only)
+     * @return broadcastable version without Logger
+     */
+    public static BroadcastableTableSchema from(@NotNull TableSchema source)
+    {
+        return new BroadcastableTableSchema(
+            source.createStatement,
+            source.modificationStatement,
+            source.partitionKeyColumns,
+            source.partitionKeyColumnTypes,
+            source.converters,
+            source.keyFieldPositions,
+            source.writeMode,
+            source.ttlOption,
+            source.timestampOption,
+            source.lowestCassandraVersion,
+            source.quoteIdentifiers
+        );
+    }
+
+    private BroadcastableTableSchema(String createStatement,
+                                     String modificationStatement,
+                                     List<String> partitionKeyColumns,
+                                     List<ColumnType<?>> 
partitionKeyColumnTypes,
+                                     List<SqlToCqlTypeConverter.Converter<?>> 
converters,
+                                     List<Integer> keyFieldPositions,
+                                     WriteMode writeMode,
+                                     TTLOption ttlOption,
+                                     TimestampOption timestampOption,
+                                     String lowestCassandraVersion,
+                                     boolean quoteIdentifiers)
+    {
+        this.createStatement = createStatement;
+        this.modificationStatement = modificationStatement;
+        this.partitionKeyColumns = partitionKeyColumns;
+        this.partitionKeyColumnTypes = partitionKeyColumnTypes;
+        this.converters = converters;
+        this.keyFieldPositions = keyFieldPositions;
+        this.writeMode = writeMode;
+        this.ttlOption = ttlOption;
+        this.timestampOption = timestampOption;
+        this.lowestCassandraVersion = lowestCassandraVersion;
+        this.quoteIdentifiers = quoteIdentifiers;
+    }
+
+    public String getCreateStatement()
+    {
+        return createStatement;
+    }
+
+    public String getModificationStatement()
+    {
+        return modificationStatement;
+    }
+
+    public List<String> getPartitionKeyColumns()
+    {
+        return partitionKeyColumns;
+    }
+
+    public List<ColumnType<?>> getPartitionKeyColumnTypes()
+    {
+        return partitionKeyColumnTypes;
+    }
+
+    public List<SqlToCqlTypeConverter.Converter<?>> getConverters()
+    {
+        return converters;
+    }
+
+    public List<Integer> getKeyFieldPositions()
+    {
+        return keyFieldPositions;
+    }
+
+    public WriteMode getWriteMode()
+    {
+        return writeMode;
+    }
+
+    public TTLOption getTtlOption()
+    {
+        return ttlOption;
+    }
+
+    public TimestampOption getTimestampOption()
+    {
+        return timestampOption;
+    }
+
+    public String getLowestCassandraVersion()
+    {
+        return lowestCassandraVersion;
+    }
+
+    public boolean isQuoteIdentifiers()
+    {
+        return quoteIdentifiers;
+    }
+
+    /**
+     * Normalizes a row by applying type converters to each field.
+     * This mirrors the normalize method in TableSchema but uses the 
broadcast-safe converters list.
+     *
+     * @param row the row data to normalize
+     * @return the normalized row (same array instance, mutated in place)
+     */
+    public Object[] normalize(Object[] row)
+    {
+        for (int index = 0; index < row.length; index++)
+        {
+            row[index] = converters.get(index).convert(row[index]);
+        }
+        return row;
+    }
+
+    /**
+     * Extracts key columns from all columns based on key field positions.
+     * This mirrors the getKeyColumns method in TableSchema but uses the 
broadcast-safe keyFieldPositions list.
+     *
+     * @param allColumns all columns in the row
+     * @return array containing only the key columns
+     */
+    public Object[] getKeyColumns(Object[] allColumns)
+    {
+        return getKeyColumns(allColumns, keyFieldPositions);
+    }
+
+    @NotNull
+    public static Object[] getKeyColumns(Object[] allColumns, List<Integer> 
keyFieldPositions)
+    {
+        Object[] result = new Object[keyFieldPositions.size()];
+        for (int keyFieldPosition = 0; keyFieldPosition < 
keyFieldPositions.size(); keyFieldPosition++)
+        {
+            Object colVal = 
allColumns[keyFieldPositions.get(keyFieldPosition)];
+            Preconditions.checkNotNull(colVal, "Found a null primary or 
composite key column in source data. All key columns must be non-null.");
+            result[keyFieldPosition] = colVal;
+        }
+        return result;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTokenPartitioner.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTokenPartitioner.java
new file mode 100644
index 00000000..997d4270
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BroadcastableTokenPartitioner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Range;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Broadcastable wrapper for TokenPartitioner with ZERO transient fields to 
optimize Spark broadcasting.
+ * <p>
+ * Only contains the partition mappings; executors will use this to 
reconstruct TokenPartitioner.
+ * <p>
+ * <b>Why ZERO transient fields matters:</b><br>
+ * Spark's {@link org.apache.spark.util.SizeEstimator} uses reflection to 
estimate object sizes before broadcasting.
+ * Each transient field forces SizeEstimator to inspect the field's type 
hierarchy, which is expensive.
+ * Logger references are particularly costly due to their deep object graphs 
(appenders, layouts, contexts).
+ * By eliminating ALL transient fields and Logger references, we:
+ * <ul>
+ *   <li>Minimize SizeEstimator reflection overhead during broadcast 
preparation</li>
+ *   <li>Reduce broadcast variable serialization size</li>
+ *   <li>Avoid accidental serialization of non-serializable objects</li>
+ * </ul>
+ */
+public final class BroadcastableTokenPartitioner implements Serializable
+{
+    private static final long serialVersionUID = -8787074052066841748L;
+
+    // Essential fields broadcast to executors - the partition mappings
+    private final Map<Range<BigInteger>, Integer> partitionEntries;
+    private final Integer numberSplits;
+
+    /**
+     * Creates a BroadcastableTokenPartitioner from a TokenPartitioner.
+     * Extracts only the partition mappings, avoiding the Logger.
+     *
+     * @param source the source TokenPartitioner
+     */
+    public static BroadcastableTokenPartitioner from(@NotNull TokenPartitioner 
source)
+    {
+        // Extract partition mappings - these are already computed and won't 
change
+        Map<Range<BigInteger>, Integer> partitionEntries = new HashMap<>();
+        for (int i = 0; i < source.numPartitions(); i++)
+        {
+            Range<BigInteger> range = source.getTokenRange(i);
+            if (range != null)
+            {
+                partitionEntries.put(range, i);
+            }
+        }
+
+        return new BroadcastableTokenPartitioner(partitionEntries, 
source.numSplits());
+    }
+
+    private BroadcastableTokenPartitioner(Map<Range<BigInteger>, Integer> 
partitionEntries,
+                                         Integer numberSplits)
+    {
+        this.partitionEntries = partitionEntries;
+        this.numberSplits = numberSplits;
+    }
+
+    public Map<Range<BigInteger>, Integer> getPartitionEntries()
+    {
+        return partitionEntries;
+    }
+
+    public Integer numSplits()
+    {
+        return numberSplits;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index f389dcb5..3f52cd74 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -37,8 +37,8 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import o.a.c.sidecar.client.shaded.client.SidecarInstance;
 import org.apache.cassandra.spark.bulkwriter.cloudstorage.StorageClientConfig;
@@ -57,7 +57,6 @@ import org.jetbrains.annotations.Nullable;
 public class BulkSparkConf implements Serializable
 {
     private static final long serialVersionUID = -5060973521517656241L;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkSparkConf.class);
 
     public static final String JDK11_OPTIONS = " 
-Djdk.attach.allowAttachSelf=true"
                                              + " --add-exports 
java.base/jdk.internal.misc=ALL-UNNAMED"
@@ -159,6 +158,12 @@ public class BulkSparkConf implements Serializable
     private transient CoordinatedWriteConf coordinatedWriteConf; // it is 
transient; deserialized from coordinatedWriteConfJson in executors
 
     public BulkSparkConf(SparkConf conf, Map<String, String> options)
+    {
+        this(conf, options, null);
+    }
+
+    // NO LOGGER as member field - to avoid logger references in broadcast 
variable.
+    public BulkSparkConf(SparkConf conf, Map<String, String> options, 
@Nullable Logger logger)
     {
         this.conf = conf;
         Optional<Integer> sidecarPortFromOptions = 
MapUtils.getOptionalInt(options, WriterOptions.SIDECAR_PORT.name(), "sidecar 
port");
@@ -173,7 +178,10 @@ public class BulkSparkConf implements Serializable
         String dc = MapUtils.getOrDefault(options, 
WriterOptions.LOCAL_DC.name(), null);
         if (!consistencyLevel.isLocal() && dc != null)
         {
-            LOGGER.warn("localDc is present for non-local consistency level {} 
specified in writer options. Correcting localDc to null", consistencyLevel);
+            if (logger != null)
+            {
+                logger.warn("localDc is present for non-local consistency 
level {} specified in writer options. Correcting localDc to null", 
consistencyLevel);
+            }
             dc = null;
         }
         this.localDC = dc;
@@ -231,7 +239,7 @@ public class BulkSparkConf implements Serializable
         this.jobTimeoutSeconds = MapUtils.getLong(options, 
WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
         this.configuredJobId = MapUtils.getOrDefault(options, 
WriterOptions.JOB_ID.name(), null);
         this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, 
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
-        this.coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+        this.coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport(), logger);
         this.digestAlgorithmSupplier = 
digestAlgorithmSupplierFromOptions(dataTransport, options);
         validateEnvironment();
     }
@@ -304,14 +312,14 @@ public class BulkSparkConf implements Serializable
     {
         if (coordinatedWriteConf == null)
         {
-            coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+            coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport(), null);
         }
 
         return coordinatedWriteConf;
     }
 
     @Nullable
-    protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport 
dataTransport)
+    protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport 
dataTransport, @Nullable Logger logger)
     {
         if (coordinatedWriteConfJson == null)
         {
@@ -323,17 +331,26 @@ public class BulkSparkConf implements Serializable
 
         if (sidecarContactPointsValue != null)
         {
-            LOGGER.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are 
ignored on the presence of COORDINATED_WRITE_CONF");
+            if (logger != null)
+            {
+                logger.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are 
ignored on the presence of COORDINATED_WRITE_CONF");
+            }
         }
 
         if (userProvidedSidecarPort != -1)
         {
-            LOGGER.warn("SIDECAR_PORT is ignored on the presence of 
COORDINATED_WRITE_CONF");
+            if (logger != null)
+            {
+                logger.warn("SIDECAR_PORT is ignored on the presence of 
COORDINATED_WRITE_CONF");
+            }
         }
 
         if (localDC != null)
         {
-            LOGGER.warn("LOCAL_DC is ignored on the presence of 
COORDINATED_WRITE_CONF");
+            if (logger != null)
+            {
+                logger.warn("LOCAL_DC is ignored on the presence of 
COORDINATED_WRITE_CONF");
+            }
         }
 
         return CoordinatedWriteConf.create(coordinatedWriteConfJson, 
consistencyLevel, SimpleClusterConf.class);
@@ -620,8 +637,6 @@ public class BulkSparkConf implements Serializable
                 String deprecatedSetting = settingPrefix + settingSuffix;
                 if (conf.contains(deprecatedSetting))
                 {
-                    LOGGER.warn("Found deprecated setting '{}'. Please use {} 
in the future.",
-                                deprecatedSetting, settingName);
                     return deprecatedSetting;
                 }
             }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
new file mode 100644
index 00000000..e2a3283a
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Immutable configuration data class for BulkWriter jobs that is safe to 
broadcast to Spark executors.
+ * This class contains pre-computed, serializable values that were computed on 
the driver.
+ * <p>
+ * Serialization Architecture:
+ * This class is the ONLY object that gets broadcast to Spark executors (via 
Spark's broadcast mechanism).
+ * It contains broadcastable wrapper implementations with ZERO transient 
fields and NO Logger references:
+ * <ul>
+ *   <li>{@link BroadcastableClusterInfo} or {@link 
BroadcastableClusterInfoGroup} - cluster metadata</li>
+ *   <li>{@link BroadcastableJobInfo} - job configuration with {@link 
BroadcastableTokenPartitioner}</li>
+ *   <li>{@link BroadcastableSchemaInfo} - schema metadata</li>
+ * </ul>
+ * <p>
+ * On the driver, {@link BulkWriterContext} instances use driver-only 
implementations:
+ * {@link CassandraClusterInfo}, {@link CassandraJobInfo}, {@link 
CassandraSchemaInfo}.
+ * Before broadcasting, these are converted to broadcastable wrappers to avoid 
Logger references
+ * and minimize Spark SizeEstimator overhead.
+ * <p>
+ * On executors, {@link BulkWriterContext} instances are reconstructed from 
this config using
+ * {@link BulkWriterContext#from(BulkWriterConfig)}, which detects the 
broadcastable
+ * wrappers and reconstructs the full implementations with fresh data from 
Cassandra Sidecar.
+ */
+public final class BulkWriterConfig implements Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+    private final BulkSparkConf conf;
+    private final int sparkDefaultParallelism;
+    private final BroadcastableJobInfo jobInfo;
+    // BroadcastableClusterInfo can be either BroadcastableCluster or 
BroadcastableClusterInfoGroup
+    private final IBroadcastableClusterInfo clusterInfo;
+    private final BroadcastableSchemaInfo schemaInfo;
+    private final String lowestCassandraVersion;
+
+    /**
+     * Creates a new immutable BulkWriterConfig with pre-computed values
+     *
+     * @param conf                    Bulk writer Spark configuration
+     * @param sparkDefaultParallelism Spark default parallelism setting
+     * @param jobInfo                 Broadcastable job information
+     * @param clusterInfo             Broadcastable cluster information 
(BroadcastableCluster or BroadcastableClusterInfoGroup)
+     * @param schemaInfo              Broadcastable schema information
+     * @param lowestCassandraVersion  Lowest Cassandra version in the cluster
+     */
+    public BulkWriterConfig(@NotNull BulkSparkConf conf,
+                            int sparkDefaultParallelism,
+                            @NotNull BroadcastableJobInfo jobInfo,
+                            @NotNull IBroadcastableClusterInfo clusterInfo,
+                            @NotNull BroadcastableSchemaInfo schemaInfo,
+                            @NotNull String lowestCassandraVersion)
+    {
+        this.conf = conf;
+        this.sparkDefaultParallelism = sparkDefaultParallelism;
+        this.jobInfo = jobInfo;
+        this.clusterInfo = clusterInfo;
+        this.schemaInfo = schemaInfo;
+        this.lowestCassandraVersion = lowestCassandraVersion;
+    }
+
+    public BulkSparkConf getConf()
+    {
+        return conf;
+    }
+
+    public int getSparkDefaultParallelism()
+    {
+        return sparkDefaultParallelism;
+    }
+
+    public BroadcastableJobInfo getBroadcastableJobInfo()
+    {
+        return jobInfo;
+    }
+
+    public IBroadcastableClusterInfo getBroadcastableClusterInfo()
+    {
+        return clusterInfo;
+    }
+
+    public BroadcastableSchemaInfo getBroadcastableSchemaInfo()
+    {
+        return schemaInfo;
+    }
+
+    public String getLowestCassandraVersion()
+    {
+        return lowestCassandraVersion;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
index 40012e01..a76b7c60 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java
@@ -19,12 +19,22 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.Serializable;
-
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
 import org.apache.cassandra.spark.common.stats.JobStatsPublisher;
 import org.apache.cassandra.bridge.CassandraBridge;
 
-public interface BulkWriterContext extends Serializable
+/**
+ * Context for bulk write operations, providing access to cluster, job, 
schema, and transport information.
+ * <p>
+ * Serialization Architecture:
+ * This interface does NOT extend Serializable. BulkWriterContext instances 
are never broadcast to executors.
+ * Instead, {@link BulkWriterConfig} is broadcast, and executors reconstruct 
BulkWriterContext instances
+ * from the config using the factory method {@link #from(BulkWriterConfig)}.
+ * <p>
+ * The implementations ({@link CassandraBulkWriterContext}, {@link 
CassandraCoordinatedBulkWriterContext})
+ * do NOT have serialVersionUID fields as they are never serialized.
+ */
+public interface BulkWriterContext
 {
     ClusterInfo cluster();
 
@@ -41,4 +51,25 @@ public interface BulkWriterContext extends Serializable
     void shutdown();
 
     TransportContext transportContext();
+
+    /**
+     * Factory method to create a BulkWriterContext from a BulkWriterConfig on 
executors.
+     * This method reconstructs context instances on executors from the 
broadcast configuration.
+     * The driver creates contexts directly using constructors, not this 
method.
+     *
+     * @param config the immutable configuration object broadcast from driver
+     * @return a new BulkWriterContext instance
+     */
+    static BulkWriterContext from(BulkWriterConfig config)
+    {
+        BulkSparkConf conf = config.getConf();
+        if (conf.isCoordinatedWriteConfigured())
+        {
+            return new CassandraCoordinatedBulkWriterContext(config);
+        }
+        else
+        {
+            return new CassandraBulkWriterContext(config);
+        }
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
index 04907221..2f3a9222 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
@@ -68,7 +68,7 @@ public class BulkWriterContextFactory
     @NotNull
     protected BulkSparkConf createBulkSparkConf(@NotNull SparkContext 
sparkContext, @NotNull Map<String, String> options)
     {
-        return new BulkSparkConf(sparkContext.getConf(), options);
+        return new BulkSparkConf(sparkContext.getConf(), options, LOGGER);
     }
 
     @NotNull
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 40d1b330..f6323af3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -42,12 +42,14 @@ import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageDataTransf
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.CloudStorageStreamResult;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCompletionCoordinator;
 import org.apache.cassandra.spark.bulkwriter.cloudstorage.ImportCoordinator;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedImportCoordinator;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.exception.SidecarApiCallException;
 import 
org.apache.cassandra.spark.exception.UnsupportedAnalyticsOperationException;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -75,7 +77,7 @@ public class CassandraBulkSourceRelation extends BaseRelation 
implements Inserta
     private final BulkWriterContext writerContext;
     private final SQLContext sqlContext;
     private final JavaSparkContext sparkContext;
-    private final Broadcast<BulkWriterContext> broadcastContext;
+    private final Broadcast<BulkWriterConfig> broadcastConfig;
     private final BulkWriteValidator writeValidator;
     private final SimpleTaskScheduler simpleTaskScheduler;
     private ImportCoordinator importCoordinator = null; // value is only set 
when using S3_COMPAT
@@ -87,12 +89,70 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
         this.writerContext = writerContext;
         this.sqlContext = sqlContext;
         this.sparkContext = 
JavaSparkContext.fromSparkContext(sqlContext.sparkContext());
-        this.broadcastContext = 
sparkContext.<BulkWriterContext>broadcast(writerContext);
+        // Extract immutable configuration from the context for broadcasting
+        BulkWriterConfig config = extractConfig(writerContext, 
sparkContext.defaultParallelism());
+        this.broadcastConfig = 
sparkContext.<BulkWriterConfig>broadcast(config);
         ReplicaAwareFailureHandler<RingInstance> failureHandler = new 
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
         this.writeValidator = new BulkWriteValidator(writerContext, 
failureHandler);
         this.simpleTaskScheduler = new SimpleTaskScheduler();
     }
 
+    /**
+     * Extracts immutable configuration from a BulkWriterContext for 
broadcasting.
+     * Creates BroadcastableCluster, BroadcastableJobInfo, and 
BroadcastableSchemaInfo
+     * to ensure zero transient fields and avoid Logger references in the 
broadcast object.
+     */
+    private static BulkWriterConfig extractConfig(BulkWriterContext context, 
int sparkDefaultParallelism)
+    {
+        if (context instanceof AbstractBulkWriterContext)
+        {
+            AbstractBulkWriterContext abstractContext = 
(AbstractBulkWriterContext) context;
+            ClusterInfo originalClusterInfo = abstractContext.cluster();
+
+            // Create BroadcastableCluster to avoid transient fields in 
broadcast
+            IBroadcastableClusterInfo broadcastableClusterInfo;
+            if (originalClusterInfo instanceof CassandraClusterInfoGroup)
+            {
+                // Coordinated write scenario
+                @SuppressWarnings("unchecked")
+                CassandraClusterInfoGroup multiCluster = 
(CassandraClusterInfoGroup) originalClusterInfo;
+                broadcastableClusterInfo = BroadcastableClusterInfoGroup.from(
+                    multiCluster,
+                    abstractContext.bulkSparkConf()
+                );
+            }
+            else
+            {
+                // Single cluster scenario
+                broadcastableClusterInfo = BroadcastableClusterInfo.from(
+                    originalClusterInfo,
+                    abstractContext.bulkSparkConf()
+                );
+            }
+
+            // Create BroadcastableJobInfo to avoid Logger in TokenPartitioner
+            BroadcastableJobInfo broadcastableJobInfo = 
BroadcastableJobInfo.from(
+                abstractContext.job(),
+                abstractContext.bulkSparkConf()
+            );
+
+            // Create BroadcastableSchemaInfo to avoid Logger in TableSchema
+            BroadcastableSchemaInfo broadcastableSchemaInfo = 
BroadcastableSchemaInfo.from(
+                abstractContext.schema()
+            );
+
+            return new BulkWriterConfig(
+                abstractContext.bulkSparkConf(),
+                sparkDefaultParallelism,
+                broadcastableJobInfo,
+                broadcastableClusterInfo,
+                broadcastableSchemaInfo,
+                abstractContext.lowestCassandraVersion()
+            );
+        }
+        throw new IllegalArgumentException("Cannot extract config from context 
type: " + context.getClass().getName());
+    }
+
     @Override
     @NotNull
     public SQLContext sqlContext()
@@ -128,14 +188,15 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
         this.startTimeNanos = System.nanoTime();
         maybeScheduleTimeout();
         maybeEnableTransportExtension();
-        Tokenizer tokenizer = new Tokenizer(writerContext);
-        TableSchema tableSchema = writerContext.schema().getTableSchema();
+        BroadcastableTableSchema broadcastableTableSchema = 
broadcastConfig.value().getBroadcastableSchemaInfo().getBroadcastableTableSchema();
+        boolean isMurmur3Partitioner = 
writerContext.cluster().getPartitioner() == Partitioner.Murmur3Partitioner;
+        Tokenizer tokenizer = new Tokenizer(broadcastableTableSchema, 
isMurmur3Partitioner);
         JavaPairRDD<DecoratedKey, Object[]> sortedRDD = data.toJavaRDD()
                                                             .map(Row::toSeq)
                                                             .map(seq -> 
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
-                                                            
.map(tableSchema::normalize)
+                                                            
.map(broadcastableTableSchema::normalize)
                                                             
.keyBy(tokenizer::getDecoratedKey)
-                                                            
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
+                                                            
.repartitionAndSortWithinPartitions(writerContext.job().getTokenPartitioner());
         persist(sortedRDD, data.columns());
     }
 
@@ -180,14 +241,14 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
 
         try
         {
-            // Copy the broadcast context as a local variable (by passing as 
the input) to avoid serialization error
+            // Copy the broadcast config as a local variable (by passing as 
the input) to avoid serialization error
             // W/o this, SerializedLambda captures the 
CassandraBulkSourceRelation object, which is not serializable (required by 
Spark),
             // as a captured argument. It causes "Task not serializable" error.
             List<WriteResult> writeResults = sortedRDD
-                                             
.mapPartitions(writeRowsInPartition(broadcastContext, columnNames))
+                                             
.mapPartitions(writeRowsInPartition(broadcastConfig, columnNames))
                                              .collect();
 
-            // Unpersist broadcast context to free up executors while driver 
waits for the
+            // Unpersist broadcast config to free up executors while driver 
waits for the
             // import to complete
             unpersist();
 
@@ -330,15 +391,16 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     }
 
     /**
-     * Get a ref copy of BulkWriterContext broadcast variable and compose a 
function to transform a partition into StreamResult
+     * Get a ref copy of BulkWriterConfig broadcast variable and compose a 
function to transform a partition into StreamResult
      *
-     * @param ctx BulkWriterContext broadcast variable
+     * @param config      BulkWriterConfig broadcast variable
+     * @param columnNames column names array
      * @return FlatMapFunction
      */
     private static FlatMapFunction<Iterator<Tuple2<DecoratedKey, Object[]>>, 
WriteResult>
-    writeRowsInPartition(Broadcast<BulkWriterContext> ctx, String[] 
columnNames)
+    writeRowsInPartition(Broadcast<BulkWriterConfig> config, String[] 
columnNames)
     {
-        return iterator -> Collections.singleton(new 
RecordWriter(ctx.getValue(), columnNames).write(iterator)).iterator();
+        return iterator -> Collections.singleton(new 
RecordWriter(config.getValue(), columnNames).write(iterator)).iterator();
     }
 
     /**
@@ -348,8 +410,8 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     {
         try
         {
-            LOGGER.info("Unpersisting broadcast context");
-            broadcastContext.unpersist(false);
+            LOGGER.info("Unpersisting broadcast config");
+            broadcastConfig.unpersist(false);
         }
         catch (Throwable throwable)
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 2eea6276..5c516837 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -28,11 +28,15 @@ import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClust
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
+/**
+ * BulkWriterContext implementation for single cluster write operations.
+ * <p>
+ * This class does NOT have a serialVersionUID because it is never directly 
serialized.
+ * See {@link AbstractBulkWriterContext} for details on the serialization 
architecture.
+ */
 // CHECKSTYLE IGNORE: This class cannot be declared as final, because 
consumers should be able to extend it
 public class CassandraBulkWriterContext extends AbstractBulkWriterContext
 {
-    private static final long serialVersionUID = 8241993502687688783L;
-
     protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf,
                                          @NotNull StructType structType,
                                          int sparkDefaultParallelism)
@@ -40,6 +44,17 @@ public class CassandraBulkWriterContext extends 
AbstractBulkWriterContext
         super(conf, structType, sparkDefaultParallelism);
     }
 
+    /**
+     * Constructor used by {@link BulkWriterContext#from(BulkWriterConfig)} 
factory method.
+     * This constructor is only used on executors to reconstruct context from 
broadcast config.
+     *
+     * @param config immutable configuration for the bulk writer
+     */
+    protected CassandraBulkWriterContext(@NotNull BulkWriterConfig config)
+    {
+        super(config);
+    }
+
     @Override
     protected ClusterInfo buildClusterInfo()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index f84a9337..ad143fb0 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -62,9 +62,20 @@ import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
 
+/**
+ * Driver-only implementation of {@link ClusterInfo} for single cluster 
operations.
+ * <p>
+ * This class is NOT serialized and does NOT have a serialVersionUID.
+ * When broadcasting to executors, the driver extracts information from this 
class
+ * and creates a {@link BroadcastableClusterInfo} instance, which is then 
included
+ * in the {@link BulkWriterConfig} that gets broadcast.
+ * <p>
+ * This class implements Serializable only because the {@link ClusterInfo} 
interface
+ * requires it (for use as a field type in broadcast classes), but instances 
of this
+ * class are never directly serialized.
+ */
 public class CassandraClusterInfo implements ClusterInfo, Closeable
 {
-    private static final long serialVersionUID = -6944818863462956767L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfo.class);
 
     protected final BulkSparkConf conf;
@@ -72,12 +83,12 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     protected String cassandraVersion;
     protected Partitioner partitioner;
 
-    protected transient volatile TokenRangeMapping<RingInstance> 
tokenRangeReplicas;
-    protected transient volatile String keyspaceSchema;
-    protected transient volatile ReplicationFactor replicationFactor;
-    protected transient volatile CassandraContext cassandraContext;
-    protected final transient AtomicReference<NodeSettings> nodeSettings;
-    protected final transient List<CompletableFuture<NodeSettings>> 
allNodeSettingFutures;
+    protected volatile TokenRangeMapping<RingInstance> tokenRangeReplicas;
+    protected volatile String keyspaceSchema;
+    protected volatile ReplicationFactor replicationFactor;
+    protected volatile CassandraContext cassandraContext;
+    protected final AtomicReference<NodeSettings> nodeSettings;
+    protected final List<CompletableFuture<NodeSettings>> 
allNodeSettingFutures;
 
     public CassandraClusterInfo(BulkSparkConf conf)
     {
@@ -96,6 +107,26 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
                                                              
cassandraContext.getCluster());
     }
 
+    /**
+     * Reconstruct from BroadcastableCluster on executor.
+     * Reuses cassandraVersion and partitioner from broadcast,
+     * fetches other data (tokenRangeMapping, replicationFactor, 
keyspaceSchema, writeAvailability) fresh from Sidecar.
+     *
+     * @param broadcastable the broadcastable cluster info from broadcast
+     */
+    public CassandraClusterInfo(BroadcastableClusterInfo broadcastable)
+    {
+        this.conf = broadcastable.getConf();
+        this.clusterId = broadcastable.clusterId();
+        this.cassandraVersion = broadcastable.getLowestCassandraVersion();
+        this.partitioner = broadcastable.getPartitioner();
+        this.cassandraContext = buildCassandraContext();
+        LOGGER.info("Reconstructing CassandraClusterInfo on executor from 
BroadcastableCluster. clusterId={}", clusterId);
+        this.nodeSettings = new AtomicReference<>(null);
+        // Executors do not need to query all node settings since 
cassandraVersion is already set from broadcast
+        this.allNodeSettingFutures = null;
+    }
+
     @Override
     public void checkBulkWriterIsEnabledOrThrow()
     {
@@ -415,6 +446,12 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
 
     protected List<NodeSettings> getAllNodeSettings()
     {
+        if (allNodeSettingFutures == null)
+        {
+            throw new IllegalStateException("getAllNodeSettings should not be 
called on executor. "
+                                            + "Cassandra version is 
pre-computed on driver and broadcast to executors.");
+        }
+
         // Worst-case, the http client is configured for 1 worker pool.
         // In that case, each future can take the full retry delay * number of 
retries,
         // and each instance will be processed serially.
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 27b8eb45..776124ec 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -32,7 +32,6 @@ import org.jetbrains.annotations.Nullable;
 
 public class CassandraJobInfo implements JobInfo
 {
-    private static final long serialVersionUID = 6140098484732683759L;
     protected final BulkSparkConf conf;
     // restoreJobId per cluster; it is guaranteed to be non-empty
     protected final MultiClusterContainer<UUID> restoreJobIds;
@@ -46,6 +45,21 @@ public class CassandraJobInfo implements JobInfo
         this.tokenPartitioner = tokenPartitioner;
     }
 
+    /**
+     * Reconstruct from BroadcastableJobInfo on executor.
+     * Reuses conf and restoreJobIds from broadcast,
+     * and reconstructs TokenPartitioner from broadcastable partition mappings.
+     *
+     * @param broadcastable the broadcastable job info from broadcast
+     */
+    public CassandraJobInfo(BroadcastableJobInfo broadcastable)
+    {
+        this.conf = broadcastable.getConf();
+        this.restoreJobIds = broadcastable.getRestoreJobIds();
+        // Reconstruct TokenPartitioner from broadcastable partition mappings
+        this.tokenPartitioner = new 
TokenPartitioner(broadcastable.getBroadcastableTokenPartitioner());
+    }
+
     @Override
     public ConsistencyLevel getConsistencyLevel()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
index 526d9a77..db98cf71 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java
@@ -23,7 +23,6 @@ import java.util.Set;
 
 public class CassandraSchemaInfo implements SchemaInfo
 {
-    private static final long serialVersionUID = -2327383232935001862L;
     private final TableSchema tableSchema;
     private final Set<String> userDefinedTypeStatements;
 
@@ -33,6 +32,18 @@ public class CassandraSchemaInfo implements SchemaInfo
         this.userDefinedTypeStatements = userDefinedTypeStatements;
     }
 
+    /**
+     * Reconstruct from BroadcastableSchemaInfo on executor.
+     * Reconstructs TableSchema from BroadcastableTableSchema (no Sidecar 
calls needed).
+     *
+     * @param broadcastable the broadcastable schema info from broadcast
+     */
+    public CassandraSchemaInfo(BroadcastableSchemaInfo broadcastable)
+    {
+        this(new TableSchema(broadcastable.getBroadcastableTableSchema()),
+             broadcastable.getUserDefinedTypeStatements());
+    }
+
     @Override
     public TableSchema getTableSchema()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
index 61cd6942..64b65444 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Map;
 
@@ -33,7 +32,20 @@ import 
org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
 import org.apache.cassandra.spark.validation.StartupValidatable;
 import org.jetbrains.annotations.Nullable;
 
-public interface ClusterInfo extends StartupValidatable, Serializable
+/**
+ * Interface for cluster information used in bulk write operations.
+ * <p>
+ * Serialization Architecture:
+ * This interface does NOT extend Serializable. ClusterInfo instances are 
never directly serialized.
+ * Driver-only implementations ({@link CassandraClusterInfo},
+ * {@link 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup})
+ * are converted to broadcastable wrappers ({@link BroadcastableClusterInfo}, 
{@link BroadcastableClusterInfoGroup})
+ * for broadcasting to executors via {@link BulkWriterConfig}.
+ * <p>
+ * On executors, ClusterInfo instances are reconstructed from the 
broadcastable wrappers using
+ * {@link 
AbstractBulkWriterContext#reconstructClusterInfoOnExecutor(IBroadcastableClusterInfo)}.
+ */
+public interface ClusterInfo extends StartupValidatable
 {
     void refreshClusterInfo();
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/IBroadcastableClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/IBroadcastableClusterInfo.java
new file mode 100644
index 00000000..fb4eb2e2
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/IBroadcastableClusterInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Minimal interface for cluster information that can be safely broadcast to 
Spark executors.
+ * This interface contains only the essential methods that broadcastable 
cluster info implementations
+ * ({@link BroadcastableClusterInfo}, {@link BroadcastableClusterInfoGroup}) 
need to provide.
+ * <p>
+ * Unlike {@link ClusterInfo}, this interface doesn't include methods that 
require fresh data
+ * from Cassandra Sidecar or runtime operations. These implementations are 
designed to be broadcast
+ * and then reconstructed to full {@link ClusterInfo} instances on executors.
+ * <p>
+ * Methods in this interface:
+ * <ul>
+ *   <li>{@link #getPartitioner()} - static cluster partitioner 
configuration</li>
+ *   <li>{@link #getLowestCassandraVersion()} - pre-computed version 
string</li>
+ *   <li>{@link #clusterId()} - cluster identifier (optional)</li>
+ *   <li>{@link #getConf()} - BulkSparkConf needed for reconstruction on 
executors</li>
+ *   <li>{@link #reconstruct()} - reconstructs full ClusterInfo instance on 
executors</li>
+ * </ul>
+ */
+public interface IBroadcastableClusterInfo extends Serializable
+{
+    /**
+     * @return the partitioner used by the cluster
+     */
+    Partitioner getPartitioner();
+
+    /**
+     * @return the lowest Cassandra version in the cluster
+     */
+    String getLowestCassandraVersion();
+
+    /**
+     * ID string that can uniquely identify a cluster.
+     * When writing to a single cluster, this may be null.
+     * When in coordinated write mode (writing to multiple clusters), this 
must return a unique string.
+     *
+     * @return cluster id string, null if absent
+     */
+    @Nullable
+    String clusterId();
+
+    /**
+     * @return the BulkSparkConf configuration needed to reconstruct 
ClusterInfo on executors
+     */
+    @NotNull
+    BulkSparkConf getConf();
+
+    /**
+     * Reconstructs a full ClusterInfo instance from this broadcastable data 
on executors.
+     * Each implementation knows how to reconstruct itself into the 
appropriate ClusterInfo type.
+     * This allows adding new broadcastable types without modifying the 
reconstruction logic
+     * in {@link AbstractBulkWriterContext}.
+     *
+     * @return reconstructed ClusterInfo (CassandraClusterInfo or 
CassandraClusterInfoGroup)
+     */
+    ClusterInfo reconstruct();
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 2c4a089e..87ad86c0 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.Serializable;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 
@@ -29,7 +28,14 @@ import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-public interface JobInfo extends Serializable
+/**
+ * Provides job-specific configuration and information for bulk write 
operations.
+ * <p>
+ * This interface does NOT extend Serializable. JobInfo instances are never 
serialized.
+ * For broadcast to executors, {@link BroadcastableJobInfo} is used instead, 
and executors
+ * reconstruct JobInfo instances from the broadcast data.
+ */
+public interface JobInfo
 {
     // ******************
     // Job Information API - should this really just move back to Config? Here 
to try to reduce the violations of the Law of Demeter more than anything else
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index a33ff642..92a866cb 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -84,9 +84,16 @@ public class RecordWriter
     private final CqlTable cqlTable;
     private StreamSession<?> streamSession = null;
 
-    public RecordWriter(BulkWriterContext writerContext, String[] columnNames)
+    /**
+     * Constructor that accepts a BulkWriterConfig and constructs the context 
on the executor.
+     * This is used when the config is broadcast to executors.
+     *
+     * @param config      immutable configuration broadcast from driver
+     * @param columnNames column names array
+     */
+    public RecordWriter(BulkWriterConfig config, String[] columnNames)
     {
-        this(writerContext, columnNames, TaskContext::get, 
SortedSSTableWriter::new);
+        this(BulkWriterContext.from(config), columnNames, TaskContext::get, 
SortedSSTableWriter::new);
     }
 
     @VisibleForTesting
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
index 0257d29c..b21d5fd4 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java
@@ -19,12 +19,18 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.Serializable;
 import java.util.Set;
 
 import org.jetbrains.annotations.NotNull;
 
-public interface SchemaInfo extends Serializable
+/**
+ * Provides schema information for bulk write operations.
+ * <p>
+ * This interface does NOT extend Serializable. SchemaInfo instances are never 
serialized.
+ * For broadcast to executors, {@link BroadcastableSchemaInfo} is used 
instead, and executors
+ * reconstruct SchemaInfo instances (specifically {@link CassandraSchemaInfo}) 
from the broadcast data.
+ */
+public interface SchemaInfo
 {
     TableSchema getTableSchema();
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 47909bbf..93f02c14 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashSet;
@@ -28,7 +27,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,11 +37,17 @@ import org.apache.cassandra.spark.common.schema.ColumnType;
 import org.apache.cassandra.spark.data.CqlField;
 import 
org.apache.cassandra.spark.exception.UnsupportedAnalyticsOperationException;
 import org.apache.spark.sql.types.StructType;
-import org.jetbrains.annotations.NotNull;
 
 import static 
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
 
-public class TableSchema implements Serializable
+/**
+ * Schema information for bulk write operations.
+ * <p>
+ * This class does NOT implement Serializable (Logger is not serializable).
+ * For broadcast to executors, {@link BroadcastableTableSchema} is used 
instead,
+ * and executors reconstruct TableSchema from the broadcastable data.
+ */
+public class TableSchema
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSchema.class);
 
@@ -52,12 +56,12 @@ public class TableSchema implements Serializable
     final List<String> partitionKeyColumns;
     final List<ColumnType<?>> partitionKeyColumnTypes;
     final List<SqlToCqlTypeConverter.Converter<?>> converters;
-    private final List<Integer> keyFieldPositions;
-    private final WriteMode writeMode;
-    private final TTLOption ttlOption;
-    private final TimestampOption timestampOption;
-    private final String lowestCassandraVersion;
-    private final boolean quoteIdentifiers;
+    final List<Integer> keyFieldPositions;
+    final WriteMode writeMode;
+    final TTLOption ttlOption;
+    final TimestampOption timestampOption;
+    final String lowestCassandraVersion;
+    final boolean quoteIdentifiers;
 
     public TableSchema(StructType dfSchema,
                        TableInfoProvider tableInfo,
@@ -85,6 +89,27 @@ public class TableSchema implements Serializable
         this.keyFieldPositions = getKeyFieldPositions(dfSchema, 
tableInfo.getColumnNames(), getRequiredKeyColumns(tableInfo));
     }
 
+    /**
+     * Reconstruct TableSchema from BroadcastableTableSchema on executor.
+     * This constructor is used only on executors when reconstructing from 
broadcast data.
+     *
+     * @param broadcastable the broadcastable table schema from broadcast
+     */
+    public TableSchema(BroadcastableTableSchema broadcastable)
+    {
+        this.createStatement = broadcastable.getCreateStatement();
+        this.modificationStatement = broadcastable.getModificationStatement();
+        this.partitionKeyColumns = broadcastable.getPartitionKeyColumns();
+        this.partitionKeyColumnTypes = 
broadcastable.getPartitionKeyColumnTypes();
+        this.converters = broadcastable.getConverters();
+        this.keyFieldPositions = broadcastable.getKeyFieldPositions();
+        this.writeMode = broadcastable.getWriteMode();
+        this.ttlOption = broadcastable.getTtlOption();
+        this.timestampOption = broadcastable.getTimestampOption();
+        this.lowestCassandraVersion = 
broadcastable.getLowestCassandraVersion();
+        this.quoteIdentifiers = broadcastable.isQuoteIdentifiers();
+    }
+
     private List<String> getRequiredKeyColumns(TableInfoProvider tableInfo)
     {
         switch (writeMode)
@@ -100,34 +125,6 @@ public class TableSchema implements Serializable
         }
     }
 
-    public Object[] normalize(Object[] row)
-    {
-        for (int index = 0; index < row.length; index++)
-        {
-            row[index] = converters.get(index).convert(row[index]);
-        }
-        return row;
-    }
-
-    public Object[] getKeyColumns(Object[] allColumns)
-    {
-        return getKeyColumns(allColumns, keyFieldPositions);
-    }
-
-    @VisibleForTesting
-    @NotNull
-    public static Object[] getKeyColumns(Object[] allColumns, List<Integer> 
keyFieldPositions)
-    {
-        Object[] result = new Object[keyFieldPositions.size()];
-        for (int keyFieldPosition = 0; keyFieldPosition < 
keyFieldPositions.size(); keyFieldPosition++)
-        {
-            Object colVal = 
allColumns[keyFieldPositions.get(keyFieldPosition)];
-            Preconditions.checkNotNull(colVal, "Found a null primary or 
composite key column in source data. All key columns must be non-null.");
-            result[keyFieldPosition] = colVal;
-        }
-        return result;
-    }
-
     private static List<SqlToCqlTypeConverter.Converter<?>> 
getConverters(StructType dfSchema,
                                                                           
TableInfoProvider tableInfo,
                                                                           
TTLOption ttlOption,
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
index 1e2fc6e0..479ddcae 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java
@@ -19,9 +19,6 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Comparator;
@@ -45,6 +42,32 @@ import org.apache.cassandra.spark.utils.RangeUtils;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.spark.Partitioner;
 
+/**
+ * Spark Partitioner for distributing data across Cassandra token ranges.
+ * <p>
+ * Serialization Architecture:
+ * This class supports TWO distinct serialization mechanisms, each serving a 
different purpose:
+ * <p>
+ * 1. <b>Direct Java Serialization (via writeObject/readObject)</b>:
+ *    Used when Spark serializes this Partitioner for shuffle operations like
+ *    {@code repartitionAndSortWithinPartitions()}. During shuffle, Spark 
sends the Partitioner
+ *    to executors to determine which partition each record belongs to. The 
custom serialization
+ *    methods at the end of this class handle saving/restoring the partition 
mappings.
+ * <p>
+ * 2. <b>Broadcast Variable Pattern (via BroadcastableTokenPartitioner)</b>:
+ *    Used when broadcasting job configuration to executors. The driver 
extracts partition mappings
+ *    into {@link BroadcastableTokenPartitioner} (a pure data wrapper with no 
transient fields),
+ *    which is broadcast via {@link BulkWriterConfig}. Executors reconstruct 
TokenPartitioner from
+ *    the broadcast data using the constructor {@link 
#TokenPartitioner(BroadcastableTokenPartitioner)}.
+ * <p>
+ * Both mechanisms are necessary because:
+ * - Shuffle operations (repartitionAndSortWithinPartitions) serialize the 
Partitioner directly
+ * - Broadcast variables use the broadcastable wrapper pattern to avoid Logger 
serialization issues
+ * <p>
+ * The transient fields (partitionMap, reversePartitionMap, nrPartitions) are 
marked transient to
+ * avoid serializing large/complex objects when not needed, but are properly 
handled by custom
+ * serialization when direct serialization is required.
+ */
 public class TokenPartitioner extends Partitioner
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenPartitioner.class);
@@ -75,10 +98,51 @@ public class TokenPartitioner extends Partitioner
         this.tokenRangeMapping = tokenRangeMapping;
         this.numberSplits = calculateSplits(tokenRangeMapping, 
userSpecifiedNumberSplits, defaultParallelism, cores);
         setupTokenRangeMap(randomize);
-        validate();  // Intentionally not keeping this in readObject(), it is 
enough to validate in constructor alone
-        LOGGER.info("Partition map " + partitionMap);
-        LOGGER.info("Reverse partition map " + reversePartitionMap);
-        LOGGER.info("Number of partitions {}", nrPartitions);
+        validate(); // Intentionally keeping validation in the driver alone; 
there is no need to re-validate when constructing in executors
+        logPartitionInfo();
+    }
+
+    private void logPartitionInfo()
+    {
+        LOGGER.info("Number of partitions: {}", nrPartitions);
+        LOGGER.info("Partition map: {}", partitionMap);
+        LOGGER.info("Reverse partition map: {}", reversePartitionMap);
+    }
+
+    /**
+     * Reconstruct TokenPartitioner from BroadcastableTokenPartitioner on 
executor.
+     * <p>
+     * This constructor is part of the <b>broadcast variable</b> serialization 
mechanism.
+     * When BulkWriterConfig is broadcast to executors, it contains 
BroadcastableTokenPartitioner
+     * (a pure data wrapper). Executors use this constructor to rebuild the 
TokenPartitioner
+     * with all necessary partition mappings.
+     * <p>
+     * This reconstruction path is separate from the direct Java serialization 
(writeObject/readObject)
+     * used for Spark shuffle operations. The broadcast pattern is preferred 
for configuration data
+     * because it avoids Logger serialization issues and minimizes broadcast 
size.
+     *
+     * @param broadcastable the broadcastable token partitioner from broadcast 
variable
+     * @see BroadcastableTokenPartitioner
+     * @see BulkWriterConfig
+     */
+    public TokenPartitioner(BroadcastableTokenPartitioner broadcastable)
+    {
+        this.tokenRangeMapping = null;  // Not needed on executors
+        this.numberSplits = broadcastable.numSplits();
+        this.partitionMap = com.google.common.collect.TreeRangeMap.create();
+        this.reversePartitionMap = new HashMap<>();
+        this.nrPartitions = 0;
+
+        // Restore partition mappings from serialized form
+        broadcastable.getPartitionEntries().forEach((range, partitionId) -> {
+            this.partitionMap.put(range, partitionId);
+            this.reversePartitionMap.put(partitionId, range);
+            if (partitionId >= this.nrPartitions)
+            {
+                this.nrPartitions = partitionId + 1;
+            }
+        });
+        logPartitionInfo();
     }
 
     @Override
@@ -137,6 +201,7 @@ public class TokenPartitioner extends Partitioner
         this.nrPartitions = nextPartitionId.get();
     }
 
+    // only invoked in driver
     private void validate()
     {
         validateMapSizes();
@@ -198,31 +263,6 @@ public class TokenPartitioner extends Partitioner
                                                
reversePartitionMap.keySet().size()));
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException
-    {
-        out.defaultWriteObject();
-        HashMap<Range<BigInteger>, Integer> partitionEntires = new HashMap<>();
-        partitionMap.asMapOfRanges().forEach(partitionEntires::put);
-        out.writeObject(partitionEntires);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOException
-    {
-        in.defaultReadObject();
-        HashMap<Range<BigInteger>, Integer> partitionEntires = 
(HashMap<Range<BigInteger>, Integer>) in.readObject();
-        partitionMap = TreeRangeMap.create();
-        reversePartitionMap = new HashMap<>();
-        partitionEntires.forEach((r, i) -> {
-            partitionMap.put(r, i);
-            reversePartitionMap.put(i, r);
-            nrPartitions++;
-        });
-        LOGGER.info("Partition map " + partitionMap);
-        LOGGER.info("Reverse partition map " + reversePartitionMap);
-        LOGGER.info("Number of partitions {}", nrPartitions);
-    }
-
     // In order to best utilize the number of Spark cores while minimizing the 
number of commit calls,
     // we calculate the number of splits that will just match or exceed the 
total number of available Spark cores.
     // Note that the actual number of partitions that result from this should 
always be at least the number of token ranges * the number of splits,
@@ -250,4 +290,72 @@ public class TokenPartitioner extends Partitioner
     {
         return (a + b - 1) / b;
     }
+
+    /**
+     * Custom serialization for Spark shuffle operations (e.g., 
repartitionAndSortWithinPartitions).
+     * <p>
+     * This method is invoked when Spark serializes the Partitioner to send it 
to executors during
+     * shuffle operations. It saves the essential partition mappings so they 
can be reconstructed
+     * on executors. This is separate from the broadcast variable 
serialization mechanism.
+     * <p>
+     * Note: This serialization path is used when the TokenPartitioner is 
passed directly to Spark
+     * operations (e.g., {@code 
.repartitionAndSortWithinPartitions(tokenPartitioner)}), not when
+     * it's broadcast as part of BulkWriterConfig.
+     *
+     * @param out the ObjectOutputStream to write to
+     * @throws java.io.IOException if an I/O error occurs during serialization
+     * @see #readObject(java.io.ObjectInputStream)
+     */
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException
+    {
+        out.defaultWriteObject();
+        // Serialize the partition mappings
+        Map<Range<BigInteger>, Integer> partitionEntries = 
partitionMap.asMapOfRanges();
+        out.writeInt(partitionEntries.size());
+        for (Map.Entry<Range<BigInteger>, Integer> entry : 
partitionEntries.entrySet())
+        {
+            out.writeObject(entry.getKey());
+            out.writeInt(entry.getValue());
+        }
+    }
+
+    /**
+     * Custom deserialization for Spark shuffle operations.
+     * <p>
+     * This method is invoked when Spark deserializes the Partitioner on 
executors during shuffle
+     * operations. It reconstructs the transient fields (partitionMap, 
reversePartitionMap, nrPartitions)
+     * from the serialized data. This ensures the Partitioner can correctly 
map tokens to partitions
+     * after deserialization.
+     * <p>
+     * Note: This deserialization path is used when the TokenPartitioner was 
serialized by Spark
+     * for shuffle operations, not when it's reconstructed from a broadcast 
BroadcastableTokenPartitioner.
+     *
+     * @param in the ObjectInputStream to read from
+     * @throws java.io.IOException if an I/O error occurs during 
deserialization
+     * @throws ClassNotFoundException if the class of a serialized object 
cannot be found
+     * @see #writeObject(java.io.ObjectOutputStream)
+     */
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException
+    {
+        in.defaultReadObject();
+        // Reconstruct partition maps
+        this.partitionMap = TreeRangeMap.create();
+        this.reversePartitionMap = new HashMap<>();
+        this.nrPartitions = 0;
+
+        int size = in.readInt();
+        for (int i = 0; i < size; i++)
+        {
+            @SuppressWarnings("unchecked")
+            Range<BigInteger> range = (Range<BigInteger>) in.readObject();
+            int partitionId = in.readInt();
+
+            this.partitionMap.put(range, partitionId);
+            this.reversePartitionMap.put(partitionId, range);
+            if (partitionId >= this.nrPartitions)
+            {
+                this.nrPartitions = partitionId + 1;
+            }
+        }
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
index 8c0b5da1..a9373735 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java
@@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.spark.bulkwriter.token.TokenUtils;
 import org.apache.cassandra.spark.common.schema.ColumnType;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
 
 public class Tokenizer implements Serializable
 {
@@ -39,13 +38,12 @@ public class Tokenizer implements Serializable
     private final TokenUtils tokenUtils;
     private final SerializableFunction<Object[], Object[]> keyColumnProvider;
 
-    public Tokenizer(BulkWriterContext writerContext)
+    public Tokenizer(BroadcastableTableSchema broadcastableTableSchema, 
boolean isMurmur3Partitioner)
     {
-        TableSchema tableSchema = writerContext.schema().getTableSchema();
-        this.tokenUtils = new TokenUtils(tableSchema.partitionKeyColumns,
-                                         tableSchema.partitionKeyColumnTypes,
-                                         
writerContext.cluster().getPartitioner() == Partitioner.Murmur3Partitioner);
-        this.keyColumnProvider = tableSchema::getKeyColumns;
+        this.tokenUtils = new 
TokenUtils(broadcastableTableSchema.getPartitionKeyColumns(),
+                                         
broadcastableTableSchema.getPartitionKeyColumnTypes(),
+                                         isMurmur3Partitioner);
+        this.keyColumnProvider = broadcastableTableSchema::getKeyColumns;
     }
 
     @VisibleForTesting
@@ -54,7 +52,7 @@ public class Tokenizer implements Serializable
                      List<ColumnType<?>> partitionKeyColumnTypes,
                      boolean isMurmur3Partitioner)
     {
-        this.keyColumnProvider = (columns) -> 
TableSchema.getKeyColumns(columns, keyColumnIndexes);
+        this.keyColumnProvider = (columns) -> 
BroadcastableTableSchema.getKeyColumns(columns, keyColumnIndexes);
         this.tokenUtils = new TokenUtils(partitionKeyColumns,
                                          partitionKeyColumnTypes,
                                          isMurmur3Partitioner);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
index 192c045e..90f14721 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageDataTransferApiFactory.java
@@ -24,8 +24,8 @@ import java.util.Map;
 
 import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.JobInfo;
-import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedCloudStorageDataTransferApi;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterSupport;
 import org.jetbrains.annotations.Nullable;
 
 public class CloudStorageDataTransferApiFactory
@@ -34,16 +34,18 @@ public class CloudStorageDataTransferApiFactory
 
     /**
      * Create CloudStorageDataTransferApi based on the actual runtime type of 
ClusterInfo
-     * @return CoordinatedCloudStorageDataTransferApi if using coordinated 
write, i.e. with CassandraClusterInfoGroup;
+     * @return CoordinatedCloudStorageDataTransferApi if using coordinated 
write, i.e. with MultiClusterSupport;
      *         otherwise, CloudStorageDataTransferApiImpl
      */
     public CloudStorageDataTransferApi createDataTransferApi(StorageClient 
storageClient,
                                                              JobInfo jobInfo,
                                                              ClusterInfo 
clusterInfo)
     {
-        if (clusterInfo instanceof CassandraClusterInfoGroup)
+        if (clusterInfo instanceof MultiClusterSupport)
         {
-            return createForCoordinated(storageClient, jobInfo, 
(CassandraClusterInfoGroup) clusterInfo);
+            @SuppressWarnings("unchecked")
+            MultiClusterSupport<ClusterInfo> multiCluster = 
(MultiClusterSupport<ClusterInfo>) clusterInfo;
+            return createForCoordinated(storageClient, jobInfo, multiCluster);
         }
         else
         {
@@ -64,7 +66,7 @@ public class CloudStorageDataTransferApiFactory
 
     private CoordinatedCloudStorageDataTransferApi 
createForCoordinated(StorageClient storageClient,
                                                                         
JobInfo jobInfo,
-                                                                        
CassandraClusterInfoGroup clusterInfoGroup)
+                                                                        
MultiClusterSupport<ClusterInfo> clusterInfoGroup)
     {
         Map<String, CloudStorageDataTransferApiImpl> apiByClusterId = new 
HashMap<>(clusterInfoGroup.size());
         clusterInfoGroup.forEach((clusterId, clusterInfo) -> {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
index 4433d0da..09b07b38 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/StorageClientConfig.java
@@ -23,13 +23,9 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class StorageClientConfig implements Serializable
 {
     private static final long serialVersionUID = -1572678388713210328L;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageClientConfig.class);
 
     public final String threadNamePrefix;
     // Controls the max concurrency/parallelism of the thread pool used by s3 
client
@@ -74,8 +70,7 @@ public class StorageClientConfig implements Serializable
         }
         catch (URISyntaxException e)
         {
-            LOGGER.error("{} is specified, but the value is invalid. 
input={}", hint, uriString);
-            throw new RuntimeException("Unable to resolve " + uriString, e);
+            throw new RuntimeException(hint + " is specified, but the value is 
invalid. input=" + uriString, e);
         }
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
index e536d196..9fd870cd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java
@@ -46,6 +46,8 @@ import 
org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.CassandraContext;
 import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
 import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -59,17 +61,29 @@ import org.jetbrains.annotations.Nullable;
 /**
  * A group of ClusterInfo. One per cluster.
  * The class does the aggregation over all clusters for applicable operations.
+ * <p>
+ * This class is NOT serialized and does NOT have a serialVersionUID.
+ * When broadcasting to executors, the driver extracts information from this 
class
+ * and creates a {@link 
org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup} instance,
+ * which is then included in the {@link 
org.apache.cassandra.spark.bulkwriter.BulkWriterConfig}
+ * that gets broadcast.
+ * <p>
+ * This class implements Serializable only because the {@link 
org.apache.cassandra.spark.bulkwriter.ClusterInfo}
+ * interface requires it (for use as a field type in broadcast classes), but 
instances of this
+ * class are never directly serialized.
  */
 public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterSupport<ClusterInfo>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
 
-    private static final long serialVersionUID = 5337884321245616172L;
-
     // immutable
     private final List<ClusterInfo> clusterInfos;
-    private transient volatile Map<String, ClusterInfo> clusterInfoById;
-    private transient volatile TokenRangeMapping<RingInstance> 
consolidatedTokenRangeMapping;
+    private final String clusterId;
+    private volatile Map<String, ClusterInfo> clusterInfoById;
+    private volatile TokenRangeMapping<RingInstance> 
consolidatedTokenRangeMapping;
+    // Pre-computed values from BroadcastableClusterInfoGroup (only set when 
reconstructed on executors)
+    private Partitioner cachedPartitioner;
+    private String cachedLowestCassandraVersion;
 
     /**
      * Creates {@link CassandraClusterInfoGroup} with the list of {@link 
ClusterInfo} from {@link BulkSparkConf} and validation
@@ -82,6 +96,20 @@ public class CassandraClusterInfoGroup implements 
ClusterInfo, MultiClusterSuppo
         return fromBulkSparkConf(conf, clusterId -> new 
CassandraClusterInfo(conf, clusterId));
     }
 
+    /**
+     * Reconstruct from BroadcastableClusterInfoGroup on executor.
+     * Creates CassandraClusterInfo instances for each cluster that will fetch 
data from Sidecar.
+     * Leverages pre-computed values (partitioner, lowestCassandraVersion) 
from the broadcastable
+     * to avoid re-validation and re-computation on executors.
+     *
+     * @param broadcastable the broadcastable cluster info group from broadcast
+     * @return new {@link CassandraClusterInfoGroup} instance
+     */
+    public static CassandraClusterInfoGroup from(BroadcastableClusterInfoGroup 
broadcastable)
+    {
+        return new CassandraClusterInfoGroup(broadcastable);
+    }
+
     /**
      * Similar to {@link #fromBulkSparkConf(BulkSparkConf)} but takes 
additional function to create {@link ClusterInfo}
      */
@@ -118,6 +146,30 @@ public class CassandraClusterInfoGroup implements 
ClusterInfo, MultiClusterSuppo
     {
         this.clusterInfos = Collections.unmodifiableList(clusterInfos);
         clusterInfoById();
+        this.clusterId = "ClusterInfoGroup: [" + String.join(", ", 
applyOnEach(ClusterInfo::clusterId).values()) + ']';
+    }
+
+    /**
+     * Private constructor for executor-only reconstruction from broadcast 
data.
+     * Accepts BroadcastableClusterInfoGroup and extracts pre-computed values 
to avoid
+     * re-validation and re-computation on executors.
+     *
+     * @param broadcastable the broadcastable cluster info group from broadcast
+     */
+    private CassandraClusterInfoGroup(BroadcastableClusterInfoGroup 
broadcastable)
+    {
+        // Build list of ClusterInfo from broadcastable data
+        List<ClusterInfo> clusterInfosList = new ArrayList<>();
+        broadcastable.forEach((clusterId, broadcastableInfo) -> {
+            clusterInfosList.add(new 
CassandraClusterInfo((BroadcastableClusterInfo) broadcastableInfo));
+        });
+        this.clusterInfos = Collections.unmodifiableList(clusterInfosList);
+
+        // Extract pre-computed values from driver to avoid re-validation on 
executors
+        this.cachedPartitioner = broadcastable.getPartitioner();
+        this.cachedLowestCassandraVersion = 
broadcastable.getLowestCassandraVersion();
+        this.clusterId = broadcastable.clusterId();
+        clusterInfoById();
     }
 
     @Override
@@ -157,6 +209,12 @@ public class CassandraClusterInfoGroup implements 
ClusterInfo, MultiClusterSuppo
     @Override
     public String getLowestCassandraVersion()
     {
+        // Return cached value if available (executor-side reconstruction)
+        if (cachedLowestCassandraVersion != null)
+        {
+            return cachedLowestCassandraVersion;
+        }
+
         if (clusterInfos.size() == 1)
         {
             return clusterInfos.get(0).getLowestCassandraVersion();
@@ -194,6 +252,12 @@ public class CassandraClusterInfoGroup implements 
ClusterInfo, MultiClusterSuppo
     @Override
     public Partitioner getPartitioner()
     {
+        // Return cached value if available (executor-side reconstruction)
+        if (cachedPartitioner != null)
+        {
+            return cachedPartitioner;
+        }
+
         Map<String, Partitioner> aggregated = 
applyOnEach(ClusterInfo::getPartitioner);
         Set<Partitioner> partitioners = EnumSet.copyOf(aggregated.values());
         if (partitioners.size() != 1)
@@ -248,7 +312,7 @@ public class CassandraClusterInfoGroup implements 
ClusterInfo, MultiClusterSuppo
     @Override
     public String clusterId()
     {
-        return "ClusterInfoGroup: [" + String.join(", ", 
applyOnEach(ClusterInfo::clusterId).values()) + ']';
+        return clusterId;
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
index 70fa59c5..5329fc21 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraCoordinatedBulkWriterContext.java
@@ -26,24 +26,43 @@ import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.spark.bulkwriter.AbstractBulkWriterContext;
 import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
 import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.DataTransport;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
 /**
- * BulkWriterContext for coordinated write
+ * BulkWriterContext for coordinated write to multiple clusters.
  * The context requires the coordinated-write configuration to be present.
+ * <p>
+ * This class does NOT have a serialVersionUID because it is never directly 
serialized.
+ * See {@link AbstractBulkWriterContext} for details on the serialization 
architecture.
  */
 public class CassandraCoordinatedBulkWriterContext extends 
AbstractBulkWriterContext
 {
-    private static final long serialVersionUID = -2296507634642008675L;
-
     public CassandraCoordinatedBulkWriterContext(@NotNull BulkSparkConf conf,
                                                  @NotNull StructType 
structType,
                                                  int sparkDefaultParallelism)
     {
         super(conf, structType, sparkDefaultParallelism);
+        validateConfiguration(conf);
+    }
+
+    /**
+     * Constructor used by {@link 
org.apache.cassandra.spark.bulkwriter.BulkWriterContext#from(BulkWriterConfig)} 
factory method.
+     * This constructor is only used on executors to reconstruct context from 
broadcast config.
+     *
+     * @param config immutable configuration for the bulk writer
+     */
+    public CassandraCoordinatedBulkWriterContext(@NotNull BulkWriterConfig 
config)
+    {
+        super(config);
+        validateConfiguration(config.getConf());
+    }
+
+    private void validateConfiguration(BulkSparkConf conf)
+    {
         Preconditions.checkArgument(conf.isCoordinatedWriteConfigured(),
                                     "Cannot create 
CassandraCoordinatedBulkWriterContext without CoordinatedWrite configuration");
         // Redundant check, since isCoordinatedWriteConfigured implies using 
S3_COMPAT mode already.
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
index 802844bd..92edf66b 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedWriteConf.java
@@ -27,7 +27,6 @@ import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
index 0b8ad44d..d7a46bfd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
@@ -31,10 +31,10 @@ import com.google.common.collect.Range;
 
 import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.JobInfo;
-import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraClusterInfoGroup;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CoordinatedWriteConf.ClusterConf;
 import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterContainer;
+import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.MultiClusterSupport;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
@@ -98,9 +98,10 @@ public class MultiClusterReplicaAwareFailureHandler<I 
extends CassandraInstance>
         CoordinatedWriteConf coordinatedWriteConf = job.coordinatedWriteConf();
         Preconditions.checkState(coordinatedWriteConf != null,
                                  "CoordinatedWriteConf is absent for 
multi-cluster write");
-        Preconditions.checkState(cluster instanceof CassandraClusterInfoGroup,
-                                 "Not a CassandraClusterInfoGroup for 
multi-cluster write");
-        CassandraClusterInfoGroup group = (CassandraClusterInfoGroup) cluster;
+        Preconditions.checkState(cluster instanceof MultiClusterSupport,
+                                 "Not a MultiClusterSupport for multi-cluster 
write");
+        @SuppressWarnings("unchecked")
+        MultiClusterSupport<ClusterInfo> group = 
(MultiClusterSupport<ClusterInfo>) cluster;
         failureHandlers.forEach((clusterId, handler) -> {
             ClusterConf clusterConf = coordinatedWriteConf.cluster(clusterId);
             ClusterInfo clusterInfo = group.getValueOrThrow(clusterId);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
index 8ed6369d..b1c8f70a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.spark.bulkwriter.util;
 
-import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Set;
 
@@ -28,10 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
-import org.apache.cassandra.spark.bulkwriter.CassandraBulkWriterContext;
+import org.apache.cassandra.spark.bulkwriter.BulkWriterConfig;
 import org.apache.cassandra.spark.bulkwriter.RingInstance;
 import org.apache.cassandra.spark.bulkwriter.TokenPartitioner;
-import 
org.apache.cassandra.spark.bulkwriter.cloudstorage.coordinated.CassandraCoordinatedBulkWriterContext;
 import 
org.apache.cassandra.spark.transports.storage.StorageAccessConfiguration;
 import org.apache.cassandra.spark.transports.storage.StorageCredentials;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -45,11 +43,23 @@ public class SbwKryoRegistrator implements KryoRegistrator
     protected static final String KRYO_KEY = "spark.kryo.registrator";
 
     // CHECKSTYLE IGNORE: Despite being static and final, this is a mutable 
field not to be confused with a constant
-    private static final Set<Class<? extends Serializable>> 
javaSerializableClasses =
-    Sets.newHashSet(CassandraBulkWriterContext.class,
-                    CassandraCoordinatedBulkWriterContext.class,
+    // Classes registered with Kryo using SbwJavaSerializer for Java 
serialization.
+    // When Spark uses Kryo (spark.serializer=KryoSerializer), these classes 
are serialized via
+    // Java's ObjectOutputStream instead of Kryo's default, applying to 
shuffle, broadcast, RDD caching,
+    // and task closures.
+    //
+    // Only TOP-LEVEL classes need registration; nested Serializable fields 
are handled recursively
+    // by Java's ObjectOutputStream:
+    //
+    // - BulkWriterConfig: The ONLY object broadcast to executors. Contains 
all broadcastable wrappers
+    //                     and configuration classes, which are automatically 
serialized recursively.
+    // - RingInstance: Serialized during shuffle and broadcast operations.
+    // - TokenPartitioner: Serialized during shuffle (e.g., 
repartitionAndSortWithinPartitions).
+    //                     Has custom writeObject/readObject that must be 
invoked.
+    private static final Set<Class<?>> javaSerializableClasses =
+    Sets.newHashSet(RingInstance.class,
                     TokenPartitioner.class,
-                    RingInstance.class);
+                    BulkWriterConfig.class);
 
     @Override
     public void registerClasses(@NotNull Kryo kryo)
@@ -65,7 +75,7 @@ public class SbwKryoRegistrator implements KryoRegistrator
         kryo.register(StorageCredentials.class, new 
StorageCredentials.Serializer());
     }
 
-    public static void addJavaSerializableClass(@NotNull Class<? extends 
Serializable> javaSerializableClass)
+    public static void addJavaSerializableClass(@NotNull Class<?> 
javaSerializableClass)
     {
         javaSerializableClasses.add(javaSerializableClass);
     }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index b92c1b01..ab0e663b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -140,7 +140,7 @@ class BulkSparkConfTest
         // mTLS is now required, and the BulkSparkConf constructor fails if 
the options aren't present
         Map<String, String> options = copyDefaultOptions();
         SparkConf sparkConf = new SparkConf();
-        assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf, 
options));
+        assertThatNoException().isThrownBy(() -> new BulkSparkConf(sparkConf, 
options, null));
     }
 
     @Test
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
index 8497f81d..5ed4c522 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
@@ -81,7 +81,7 @@ public class CassandraClusterInfoTest
 
         MockClusterInfoForTimeSkew(int allowanceMinutes, Instant remoteNow)
         {
-            super(null);
+            super((BulkSparkConf) null);
             mockCassandraContext(allowanceMinutes, remoteNow);
         }
 
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 5578e95e..8a2c1728 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -114,7 +114,9 @@ class RecordWriterTest
         writerContext.setReplicationFactor(rf);
         tc = new TestTaskContext();
         range = 
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
-        tokenizer = new Tokenizer(writerContext);
+        BroadcastableTableSchema broadcastableTableSchema = 
BroadcastableTableSchema.from(writerContext.schema().getTableSchema());
+        boolean isMurmur3Partitioner = 
writerContext.cluster().getPartitioner() == 
org.apache.cassandra.spark.data.partitioner.Partitioner.Murmur3Partitioner;
+        tokenizer = new Tokenizer(broadcastableTableSchema, 
isMurmur3Partitioner);
     }
 
     @Test
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
index 1cfed87f..1e9b7ef8 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java
@@ -367,9 +367,10 @@ public class TableSchemaNormalizeTest
         String[] fieldNames = {field};
         ColumnType<?>[] cqlTypes = {columnType};
         TableSchema schema = buildSchema(cassandraVersion, fieldNames, 
sparkTypes, new CqlField.CqlType[]{cqlType}, fieldNames, cqlTypes, fieldNames);
+        BroadcastableTableSchema broadcastable = 
BroadcastableTableSchema.from(schema);
         Object[] source = new Object[]{sourceVal};
         Object[] expected = new Object[]{expectedVal};
-        Object[] normalized = schema.normalize(source);
+        Object[] normalized = broadcastable.normalize(source);
         assertThat(normalized, is(equalTo(expected)));
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 88163df0..257034bf 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -183,11 +183,11 @@ public class TableSchemaTest
     
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
     public void normalizeConvertsValidTable(String cassandraVersion)
     {
-        TableSchema schema = getValidSchemaBuilder(cassandraVersion)
-                .build();
+        TableSchema schema = getValidSchemaBuilder(cassandraVersion).build();
+        BroadcastableTableSchema broadcastable = 
BroadcastableTableSchema.from(schema);
 
-        assertThat(schema.normalize(new Object[]{1, 1L, "foo", 2}))
-                .isEqualTo(new Object[]{1, -2147483648, "foo", 2});
+        assertThat(broadcastable.normalize(new Object[]{1, 1L, "foo", 2}))
+        .isEqualTo(new Object[]{1, -2147483648, "foo", 2});
     }
 
     @ParameterizedTest
@@ -213,16 +213,17 @@ public class TableSchemaTest
     public void testGetKeyColumnsFindsCorrectValues(String cassandraVersion)
     {
         StructType outOfOrderDataFrameSchema = new StructType()
-                .add("date", DataTypes.TimestampType)
-                .add("id", DataTypes.IntegerType)
-                .add("course", DataTypes.StringType)
-                .add("marks", DataTypes.IntegerType);
+                                               .add("date", 
DataTypes.TimestampType)
+                                               .add("id", 
DataTypes.IntegerType)
+                                               .add("course", 
DataTypes.StringType)
+                                               .add("marks", 
DataTypes.IntegerType);
 
         TableSchema schema = getValidSchemaBuilder(cassandraVersion)
-                .withDataFrameSchema(outOfOrderDataFrameSchema)
-                .build();
-        assertThat(schema.getKeyColumns(new Object[]{"date_val", "id_val", 
"course_val", "marks_val"}))
-                .isEqualTo(new Object[]{"id_val", "date_val"});
+                             .withDataFrameSchema(outOfOrderDataFrameSchema)
+                             .build();
+        BroadcastableTableSchema broadcastable = 
BroadcastableTableSchema.from(schema);
+        assertThat(broadcastable.getKeyColumns(new Object[]{"date_val", 
"id_val", "course_val", "marks_val"}))
+        .isEqualTo(new Object[]{"id_val", "date_val"});
     }
 
     @ParameterizedTest
@@ -230,10 +231,11 @@ public class TableSchemaTest
     public void testGetKeyColumnsFailsWhenNullKeyValues(String 
cassandraVersion)
     {
         TableSchema schema = getValidSchemaBuilder(cassandraVersion)
-                .build();
-        assertThatThrownBy(() -> schema.getKeyColumns(new Object[]{"foo", 
null, "baz", "boo"}))
-                .isInstanceOf(NullPointerException.class)
-                .hasMessage("Found a null primary or composite key column in 
source data. All key columns must be non-null.");
+                             .build();
+        BroadcastableTableSchema broadcastable = 
BroadcastableTableSchema.from(schema);
+        assertThatThrownBy(() -> broadcastable.getKeyColumns(new 
Object[]{"foo", null, "baz", "boo"}))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("Found a null primary or composite key column in source 
data. All key columns must be non-null.");
     }
 
     @ParameterizedTest
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
index 26b3ae17..4825c8de 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroupTest.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Range;
 import org.junit.jupiter.api.Test;
 
 import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup;
 import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
 import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo;
 import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfoTest;
@@ -272,20 +273,49 @@ class CassandraClusterInfoGroupTest
     @Test
     void testSerDeser()
     {
-        CassandraClusterInfoGroup origin = mockClusterGroup(2, index -> 
mockClusterInfo("cluster" + index));
-        assertThat(origin.clusterInfoByIdUnsafe()).isNotNull();
-        assertThat(origin.getValueOrNull("cluster0")).isNotNull();
-        assertThat(origin.getValueOrNull("cluster1")).isNotNull();
-        byte[] serialized = SerializationUtils.serialize(origin);
-        CassandraClusterInfoGroup target = 
SerializationUtils.deserialize(serialized, CassandraClusterInfoGroup.class);
-        assertThat(target.clusterInfoByIdUnsafe())
-        .describedAs("clusterInfoById should be null in the deserialized 
object")
-        .isNull();
-        assertThat(target.getValueOrNull("cluster0")).isNotNull();
-        assertThat(target.getValueOrNull("cluster1")).isNotNull();
-        assertThat(target.clusterInfoByIdUnsafe())
-        .describedAs("clusterInfoById should now be lazily initialized")
-        .isNotNull();
+        // Create a CassandraClusterInfoGroup with some test data
+        CassandraClusterInfoGroup originalGroup = mockClusterGroup(2, index -> 
{
+            CassandraClusterInfo clusterInfo = mockClusterInfo("cluster" + 
index);
+            
when(clusterInfo.getPartitioner()).thenReturn(Partitioner.Murmur3Partitioner);
+            when(clusterInfo.getLowestCassandraVersion()).thenReturn("4.0.0");
+            return clusterInfo;
+        });
+
+        // Mock the BulkSparkConf needed for serialization
+        BulkSparkConf conf = mock(BulkSparkConf.class, 
withSettings().serializable());
+
+        // Convert to BroadcastableClusterInfoGroup (what actually gets 
serialized/broadcast in Spark)
+        BroadcastableClusterInfoGroup broadcastable = 
BroadcastableClusterInfoGroup.from(originalGroup, conf);
+
+        // Serialize and deserialize using SerializationUtils
+        byte[] serializedBytes = SerializationUtils.serialize(broadcastable);
+        BroadcastableClusterInfoGroup deserializedBroadcastable = 
SerializationUtils.deserialize(serializedBytes, 
BroadcastableClusterInfoGroup.class);
+
+        // Verify the deserialized broadcastable has the correct pre-computed 
values
+        assertThat(deserializedBroadcastable.clusterId())
+        .describedAs("ClusterId should be preserved after serialization")
+        .isEqualTo(originalGroup.clusterId());
+
+        assertThat(deserializedBroadcastable.getPartitioner())
+        .describedAs("Partitioner should be preserved after serialization")
+        .isEqualTo(Partitioner.Murmur3Partitioner);
+
+        assertThat(deserializedBroadcastable.getLowestCassandraVersion())
+        .describedAs("Lowest Cassandra version should be preserved after 
serialization")
+        .isEqualTo("4.0.0");
+
+        assertThat(deserializedBroadcastable.size())
+        .describedAs("Number of clusters should be preserved after 
serialization")
+        .isEqualTo(2);
+
+        // Verify we can iterate over the deserialized clusters
+        int[] clusterCount = {0};
+        deserializedBroadcastable.forEach((clusterId, clusterInfo) -> {
+            assertThat(clusterId).isIn("cluster0", "cluster1");
+            assertThat(clusterInfo).isNotNull();
+            clusterCount[0]++;
+        });
+        assertThat(clusterCount[0]).isEqualTo(2);
     }
 
     private CassandraClusterInfoGroup mockClusterGroup(int size,
@@ -297,8 +327,7 @@ class CassandraClusterInfoGroupTest
 
     private CassandraClusterInfo mockClusterInfo(String clusterId)
     {
-        CassandraClusterInfo clusterInfo = mock(CassandraClusterInfo.class,
-                                                
withSettings().serializable()); // serializable required by testSerDeser
+        CassandraClusterInfo clusterInfo = mock(CassandraClusterInfo.class);
         when(clusterInfo.clusterId()).thenReturn(clusterId);
         return clusterInfo;
     }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
index 99e3b8ba..780390e3 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java
@@ -143,13 +143,15 @@ class CoordinatedImportCoordinatorTest
         coordinator.onStageReady(jobId);
         assertThat(coordinator.isStageReady()).isTrue();
 
-        loopAssert(() -> stagedClusters.getAllValues().size() == 2, "waiting 
for all cluster to stage successfully");
+        loopAssert(() -> stagedClusters.getAllValues().size() == 2,
+                   "waiting for all cluster to stage successfully. actual: " + 
stagedClusters.getAllValues());
         
assertThat(stagedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1, 
clusterId2);
 
         // signal apply read
         coordinator.onImportReady(jobId);
 
-        loopAssert(() -> appliedClusters.getAllValues().size() == 2, "waiting 
for all cluster to import successfully");
+        loopAssert(() -> appliedClusters.getAllValues().size() == 2,
+                   "waiting for all cluster to import successfully. actual: " 
+ appliedClusters.getAllValues());
         
assertThat(appliedClusters.getAllValues()).containsExactlyInAnyOrder(clusterId1,
 clusterId2);
 
         fut.get();
@@ -292,10 +294,10 @@ class CoordinatedImportCoordinatorTest
                                                 .build();
     }
 
-    // loop at most 10 times until the condition is evaluated to true
+    // loop at most 50 times until the condition is evaluated to true
     private void loopAssert(Supplier<Boolean> condition, String desc)
     {
-        int attempts = 10;
+        int attempts = 50;
         while (!condition.get() && attempts-- > 1)
         {
             Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 06706af5..cee37d32 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -537,8 +537,7 @@ public final class ReaderUtils extends TokenUtils
      * Read primary Index.db file
      *
      * @param primaryIndex input stream for Index.db file
-     * @param tracker      tracker that consumes each key byffer and returns 
true if can exit early, otherwise continues to read primary index
-     * @return pair of first and last decorated keys
+     * @param tracker      tracker that consumes each key buffer and returns 
true if can exit early, otherwise continues to read primary index
      * @throws IOException
      */
     public static void readPrimaryIndex(@NotNull InputStream primaryIndex,
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 3c84591d..91fc5a7d 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -492,8 +492,7 @@ public final class ReaderUtils extends TokenUtils
      * Read primary Index.db file
      *
      * @param primaryIndex input stream for Index.db file
-     * @param tracker      tracker that consumes each key byffer and returns 
true if can exit early, otherwise continues to read primary index
-     * @return pair of first and last decorated keys
+     * @param tracker      tracker that consumes each key buffer and returns 
true if can exit early, otherwise continues to read primary index
      * @throws IOException
      */
     public static void readPrimaryIndex(@NotNull InputStream primaryIndex,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to