frankgh commented on code in PR #146:
URL:
https://github.com/apache/cassandra-analytics/pull/146#discussion_r2453381595
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java:
##########
@@ -65,7 +67,7 @@ public void registerClasses(@NotNull Kryo kryo)
kryo.register(StorageCredentials.class, new
StorageCredentials.Serializer());
}
- public static void addJavaSerializableClass(@NotNull Class<? extends
Serializable> javaSerializableClass)
+ public static void addJavaSerializableClass(@NotNull Class<?>
javaSerializableClass)
Review Comment:
Can we change the method name here to reflect the actual behavior. Maybe
also add a javadoc noting that the classes added here will be registered to
Kryo.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -158,7 +157,7 @@ public class BulkSparkConf implements Serializable
protected final String coordinatedWriteConfJson;
private transient CoordinatedWriteConf coordinatedWriteConf; // it is
transient; deserialized from coordinatedWriteConfJson in executors
- public BulkSparkConf(SparkConf conf, Map<String, String> options)
+ public BulkSparkConf(SparkConf conf, Map<String, String> options,
@Nullable Logger logger)
Review Comment:
NIT, I think preserving the existing constructor will avoid making a lot of
changes in many calling sites.
```suggestion
public BulkSparkConf(SparkConf conf, Map<String, String> options)
{
this(conf, options, null);
}
public BulkSparkConf(SparkConf conf, Map<String, String> options,
@Nullable Logger logger)
```
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterConfig.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Immutable configuration data class for BulkWriter jobs that is safe to
broadcast to Spark executors.
+ * This class contains pre-computed, serializable values that were computed on
the driver.
+ * <p>
+ * Serialization Architecture:
+ * This class is the ONLY object that gets broadcast to Spark executors (via
Spark's broadcast mechanism).
+ * It contains serializable implementations of cluster information ({@link
SerializableClusterInfo} or
+ * {@link SerializableClusterInfoGroup}) that have zero transient fields for
safe serialization.
+ * <p>
+ * On the driver, {@link BulkWriterContext} instances use driver-only
implementations like
+ * {@link CassandraClusterInfo}. Before broadcasting, these are converted to
serializable forms.
+ * On executors, {@link BulkWriterContext} instances are reconstructed from
this config using
+ * {@link BulkWriterContext#from(BulkWriterConfig, boolean)}, which uses the
serializable cluster
+ * information directly without converting back to driver-only types.
+ */
+public final class BulkWriterConfig implements Serializable
Review Comment:
My concern here is that some of these fields need to be recomputed
dynamically from the executor and we are not honoring the when we are trying to
fetch non-cached values in some of the methods of this class.
##########
CHANGES.txt:
##########
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Refactor BulkWriterContext broadcasting to use immutable config class
(CASSANALYTICS-TBD)
Review Comment:
```suggestion
* Refactor BulkWriterContext broadcasting to use immutable config class
(CASSANALYTICS-89)
```
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CoordinatedImportCoordinatorTest.java:
##########
@@ -292,10 +294,10 @@ private RestoreJobProgressResponsePayload
failedJobProgress()
.build();
}
- // loop at most 10 times until the condition is evaluated to true
+ // loop at most 50 times until the condition is evaluated to true
Review Comment:
is 50 desirable here?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -57,7 +57,6 @@
public class BulkSparkConf implements Serializable
{
private static final long serialVersionUID = -5060973521517656241L;
- private static final Logger LOGGER =
LoggerFactory.getLogger(BulkSparkConf.class);
Review Comment:
We should document this in code, so there is historical context and we avoid
people re-adding the logger in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]