nsivabalan commented on code in PR #5347: URL: https://github.com/apache/hudi/pull/5347#discussion_r852166660
########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala: ########## @@ -130,7 +129,15 @@ object HoodieSparkUtils extends SparkAdapterSupport { */ def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = { - val latestTableSchemaConverted = if (latestTableSchema.isPresent && reconcileToLatestSchema) Some(latestTableSchema.get()) else None + var latestTableSchemaConverted : Option[Schema] = None + + if (latestTableSchema.isPresent && reconcileToLatestSchema) { + latestTableSchemaConverted = Some(latestTableSchema.get()) + } else { + // cases when users want to use latestTableSchema but have not turned on reconcileToLatestSchema explicitly + // for example, when using a Transformer implementation to transform source RDD to target RDD + latestTableSchemaConverted = if (latestTableSchema.isPresent) Some(latestTableSchema.get()) else None Review Comment: looks like this was a regression in one of the refactorings. we had this in 0.10.1. ``` var writeSchema : Schema = null; var toReconcileSchema : Schema = null; if (reconcileToLatestSchema && latestTableSchema.isPresent) { // if reconcileToLatestSchema is set to true and latestSchema is present, then try to leverage latestTableSchema. // this code path will handle situations where records are serialized in odl schema, but callers wish to convert // to Rdd[GenericRecord] using different schema(could be evolved schema or could be latest table schema) writeSchema = dfWriteSchema toReconcileSchema = latestTableSchema.get() } else { // there are paths where callers wish to use latestTableSchema to convert to Rdd[GenericRecords] and not use // row's schema. So use latestTableSchema if present. if not available, fallback to using row's schema. writeSchema = if (latestTableSchema.isPresent) { latestTableSchema.get()} else { dfWriteSchema} } createRddInternal(df, writeSchema, toReconcileSchema, structName, recordNamespace) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org