nsivabalan commented on code in PR #5347:
URL: https://github.com/apache/hudi/pull/5347#discussion_r852166660


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -130,7 +129,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
    */
   def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
reconcileToLatestSchema: Boolean,
                 latestTableSchema: org.apache.hudi.common.util.Option[Schema] 
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
-    val latestTableSchemaConverted = if (latestTableSchema.isPresent && 
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+    var latestTableSchemaConverted : Option[Schema] = None
+
+    if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+      latestTableSchemaConverted = Some(latestTableSchema.get())
+    } else {
+      // cases when users want to use latestTableSchema but have not turned on 
reconcileToLatestSchema explicitly
+      // for example, when using a Transformer implementation to transform 
source RDD to target RDD
+      latestTableSchemaConverted = if (latestTableSchema.isPresent) 
Some(latestTableSchema.get()) else None

Review Comment:
   looks like this was a regression in one of the refactorings. we had this in 
0.10.1. 
   ```
       var writeSchema : Schema = null;
       var toReconcileSchema : Schema = null;
       if (reconcileToLatestSchema && latestTableSchema.isPresent) {
         // if reconcileToLatestSchema is set to true and latestSchema is 
present, then try to leverage latestTableSchema.
         // this code path will handle situations where records are serialized 
in odl schema, but callers wish to convert
         // to Rdd[GenericRecord] using different schema(could be evolved 
schema or could be latest table schema)
         writeSchema = dfWriteSchema
         toReconcileSchema = latestTableSchema.get()
       } else {
         // there are paths where callers wish to use latestTableSchema to 
convert to Rdd[GenericRecords] and not use
         // row's schema. So use latestTableSchema if present. if not 
available, fallback to using row's schema.
         writeSchema = if (latestTableSchema.isPresent) { 
latestTableSchema.get()} else { dfWriteSchema}
       }
       createRddInternal(df, writeSchema, toReconcileSchema, structName, 
recordNamespace)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to