frankgh commented on code in PR #146:
URL: 
https://github.com/apache/cassandra-analytics/pull/146#discussion_r2471171915


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -84,6 +84,18 @@ public class RecordWriter
     private final CqlTable cqlTable;
     private StreamSession<?> streamSession = null;
 
+    /**
+     * Constructor that accepts a BulkWriterConfig and constructs the context 
on the executor.
+     * This is used when the config is broadcast to executors.
+     *
+     * @param config      immutable configuration broadcast from driver
+     * @param columnNames column names array
+     */
+    public RecordWriter(BulkWriterConfig config, String[] columnNames)
+    {
+        this(BulkWriterContext.from(config), columnNames);

Review Comment:
   We only create a record writer from the executor, the other constructor is 
not necessary.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/Tokenizer.java:
##########
@@ -39,13 +38,12 @@ interface SerializableFunction<T, R> extends Function<T, 
R>, Serializable
     private final TokenUtils tokenUtils;
     private final SerializableFunction<Object[], Object[]> keyColumnProvider;
 
-    public Tokenizer(BulkWriterContext writerContext)
+    public Tokenizer(BroadcastableTableSchema broadcastableTableSchema, 
boolean isMurmur3Partitioner)
     {
-        TableSchema tableSchema = writerContext.schema().getTableSchema();
-        this.tokenUtils = new TokenUtils(tableSchema.partitionKeyColumns,
-                                         tableSchema.partitionKeyColumnTypes,
-                                         
writerContext.cluster().getPartitioner() == Partitioner.Murmur3Partitioner);
-        this.keyColumnProvider = tableSchema::getKeyColumns;
+        this.tokenUtils = new 
TokenUtils(broadcastableTableSchema.getPartitionKeyColumns(),
+                                         
broadcastableTableSchema.getPartitionKeyColumnTypes(),
+                                         isMurmur3Partitioner);

Review Comment:
   it seems a little silly that we pass a boolean flag here for determining 
whether the partitioner is murmur3 or not. Can't we just directly pass the 
partitioner to TokenUtils instead?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java:
##########
@@ -59,17 +61,29 @@
 /**
  * A group of ClusterInfo. One per cluster.
  * The class does the aggregation over all clusters for applicable operations.
+ * <p>
+ * This class is NOT serialized and does NOT have a serialVersionUID.
+ * When broadcasting to executors, the driver extracts information from this 
class
+ * and creates a {@link 
org.apache.cassandra.spark.bulkwriter.BroadcastableClusterInfoGroup} instance,
+ * which is then included in the {@link 
org.apache.cassandra.spark.bulkwriter.BulkWriterConfig}
+ * that gets broadcast.
+ * <p>
+ * This class implements Serializable only because the {@link 
org.apache.cassandra.spark.bulkwriter.ClusterInfo}
+ * interface requires it (for use as a field type in broadcast classes), but 
instances of this
+ * class are never directly serialized.
  */
 public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterSupport<ClusterInfo>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
 
-    private static final long serialVersionUID = 5337884321245616172L;
-
     // immutable
     private final List<ClusterInfo> clusterInfos;
-    private transient volatile Map<String, ClusterInfo> clusterInfoById;
-    private transient volatile TokenRangeMapping<RingInstance> 
consolidatedTokenRangeMapping;
+    private final String clusterId;
+    private volatile Map<String, ClusterInfo> clusterInfoById;
+    private volatile TokenRangeMapping<RingInstance> 
consolidatedTokenRangeMapping;
+    // Pre-computed values from BroadcastableClusterInfoGroup (only set when 
reconstructed on executors)
+    private volatile Partitioner cachedPartitioner;
+    private volatile String cachedLowestCassandraVersion;

Review Comment:
   I don't think `cachedPartitioner` or `cachedLowestCassandraVersion` need to 
be volatile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to