nsivabalan commented on code in PR #13526:
URL: https://github.com/apache/hudi/pull/13526#discussion_r2194111259


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -213,12 +213,12 @@ object HoodieDatasetBulkInsertHelper
       }), SQLConf.get).toJavaRDD())
   }
 
-  private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, 
preCombineFieldRef: String, isGlobalIndex: Boolean, targetParallelism: Int): 
RDD[InternalRow] = {
+  private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, 
preCombineFields: List[String], isGlobalIndex: Boolean, targetParallelism: 
Int): RDD[InternalRow] = {
     val recordKeyMetaFieldOrd = 
schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
     val partitionPathMetaFieldOrd = 
schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
     // NOTE: Pre-combine field could be a nested field
-    val preCombineFieldPath = composeNestedFieldPath(schema, 
preCombineFieldRef)
-      .getOrElse(throw new HoodieException(s"Pre-combine field 
$preCombineFieldRef is missing in $schema"))
+    val preCombineFieldPaths = preCombineFields.map(preCombineField => 
composeNestedFieldPath(schema, preCombineField)
+      .getOrElse(throw new HoodieException(s"Pre-combine field 
$preCombineFields is missing in $schema")))

Review Comment:
   minor. 
   within HoodieException, we should be referring to `preCombineField` only and 
not `preCombineFields` 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1360,8 +1362,13 @@ public HoodieTableType getTableType() {
         HoodieTableConfig.TYPE, 
HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
   }
 
-  public String getPreCombineField() {
-    return getString(PRECOMBINE_FIELD_NAME);
+  public List<String> getPreCombineField() {

Review Comment:
   getPreCombineFields()
   "s" in the end 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -233,13 +233,14 @@ object HoodieDatasetBulkInsertHelper
         (rowKey, row.copy())
       }
       .reduceByKey ((oneRow, otherRow) => {
-        val onePreCombineVal = getNestedInternalRowValue(oneRow, 
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
-        val otherPreCombineVal = getNestedInternalRowValue(otherRow, 
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
-        if 
(onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
+        val onePreCombineVals = new 
Comparables(JavaConverters.seqAsJavaList(preCombineFieldPaths.map(preCombineFieldPath
 => getNestedInternalRowValue(oneRow, 
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]])))

Review Comment:
   `new Comparables(preCombineFieldPaths.map(preCombineFieldPath => 
getNestedInternalRowValue(oneRow, 
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]).toList.asJava)` 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -86,10 +86,10 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
   }
   private val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
 
-  protected val payloadProps: TypedProperties = tableState.preCombineFieldOpt
+  protected val payloadProps: TypedProperties = tableState.preCombineFieldsOpt
     .map { preCombineField =>
       HoodiePayloadConfig.newBuilder
-        .withPayloadOrderingField(preCombineField)
+        .withPayloadOrderingField(String.join(",", preCombineField: _*))

Review Comment:
   `withPayloadOrderingFields` 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JavaScalaConverters.scala:
##########
@@ -69,4 +69,9 @@ object JavaScalaConverters {
   def convertJavaPropertiesToScalaMap(javaProperties: java.util.Properties): 
Map[String, String] = {
     javaProperties.asScala.toMap
   }
+
+  def convertHudiOptionToScalaOption[A](option: 
org.apache.hudi.common.util.Option[A]): Option[A] = {

Review Comment:
   don't we already have one for this
   
   
https://github.com/apache/hudi/blob/7c6edde1f6a1004df75bd82d206d1b665e80e883/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala#L40
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -74,13 +76,13 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
   extends Iterator[InternalRow]
   with SparkAdapterSupport with AvroDeserializerSupport with Closeable {
 
-  protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
-    .map { preCombineField =>
+  protected val payloadProps: Properties = 
metaClient.getTableConfig.getPreCombineFields
+    .map[TypedProperties](JFunction.toJavaFunction(preCombineFields =>
       HoodiePayloadConfig.newBuilder
-        .withPayloadOrderingField(preCombineField)
+        .withPayloadOrderingField(preCombineFields)
         .build
         .getProps
-    }.getOrElse(new Properties())
+    )).orElse(new TypedProperties())

Review Comment:
   why changed from new Properties to new TypedProperties



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -91,13 +93,13 @@ class HoodieCDCRDD(
 
   private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty, 
metaClient.getTableConfig)
 
-  protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
-    .map { preCombineField =>
+  protected val payloadProps: Properties = 
metaClient.getTableConfig.getPreCombineFields
+    .map[TypedProperties](JFunction.toJavaFunction(preCombineFields =>
       HoodiePayloadConfig.newBuilder
-        .withPayloadOrderingField(preCombineField)
+        .withPayloadOrderingField(preCombineFields)
         .build
         .getProps
-    }.getOrElse(new Properties())
+    )).orElse(new TypedProperties())

Review Comment:
   same question as above



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -137,24 +139,14 @@ class HoodieCDCRDD(
       keyFields.head
     }
 
-    private lazy val preCombineFieldOpt: Option[String] = 
Option(metaClient.getTableConfig.getPreCombineField)
+    private lazy val preCombineFieldsOpt: Option[List[String]] = 
convertHudiOptionToScalaOption(metaClient.getTableConfig.getPreCombineFieldList
+      .map[List[String]](list => list.asScala.toList))
 
     private lazy val tableState = {
       val metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(props)
         .build()
-      HoodieTableState(
-        basePath.toUri.toString,
-        Some(split.changes.last.getInstant),
-        recordKeyField,
-        preCombineFieldOpt,
-        usesVirtualKeys = !populateMetaFields,
-        metaClient.getTableConfig.getPayloadClass,
-        metadataConfig,
-        // TODO support CDC with spark record
-        recordMergeImplClasses = List(classOf[HoodieAvroRecordMerger].getName),
-        recordMergeStrategyId = 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
-      )
+      HoodieTableState(basePath.toUri.toString, 
Some(split.changes.last.getInstant), recordKeyField, preCombineFieldsOpt, 
usesVirtualKeys = !populateMetaFields, 
metaClient.getTableConfig.getPayloadClass, metadataConfig, 
recordMergeImplClasses = List(classOf[HoodieAvroRecordMerger].getName), 
recordMergeStrategyId = HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)

Review Comment:
   what happened here. why format changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to