yihua commented on code in PR #12772:
URL: https://github.com/apache/hudi/pull/12772#discussion_r2074139354


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -184,7 +185,7 @@ private void writeRow(InternalRow row) {
       UTF8String writeCommitTime = shouldPreserveHoodieMetadata ? 
row.getUTF8String(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD)
           : commitTime;
 
-      InternalRow updatedRow = new HoodieInternalRow(writeCommitTime, seqId, 
recordKey,
+      InternalRow updatedRow = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createInternalRow(writeCommitTime, 
seqId, recordKey,

Review Comment:
   Have you checked if the performance is the same after replacing with 
`HoodieInternalRow::new` with 
`SparkAdapterSupport$.MODULE$.sparkAdapter().createInternalRow`.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java:
##########
@@ -62,9 +62,11 @@ public RDDCustomColumnsSortPartitioner(String[] columnNames, 
Schema schema, Hood
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
-    return records
-        .sortBy(record -> SortUtils.getComparableSortColumns(record, 
sortColumnNames, serializableSchema.get(), suffixRecordKey, 
consistentLogicalTimestampEnabled),
-            true, outputSparkPartitions);
+    return records.sortBy(record ->
+            SparkSortUtils.getComparableSortColumns(

Review Comment:
   Could we still use `SortUtils.getComparableSortColumns`?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java:
##########
@@ -84,8 +85,8 @@ private JavaRDD<HoodieRecord<T>> 
doPartitionAndCustomColumnSort(JavaRDD<HoodieRe
     final String[] sortColumns = sortColumnNames;
     final SerializableSchema schema = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields((new 
Schema.Parser().parse(table.getConfig().getSchema()))));
     Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & 
Serializable) (t1, t2) -> {
-      FlatLists.ComparableList obj1 = 
FlatLists.ofComparableArray(t1.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled));
-      FlatLists.ComparableList obj2 = 
FlatLists.ofComparableArray(t2.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled));
+      FlatLists.ComparableList obj1 = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createComparableList(t1.getColumnValues(schema.get(),
 sortColumns, consistentLogicalTimestampEnabled));
+      FlatLists.ComparableList obj2 = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createComparableList(t2.getColumnValues(schema.get(),
 sortColumns, consistentLogicalTimestampEnabled));

Review Comment:
   Same here on reverting back to `FlatLists.ofComparableArray`.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -151,4 +152,10 @@ public Comparable convertValueToEngineType(Comparable 
value) {
     }
     return value;
   }
+
+  @Override
+  public int compareValues(Comparable a, Comparable b) {
+    // [SPARK-46832] UTF8String doesn't support compareTo anymore
+    return SparkSortUtils.compareValues(a, b);

Review Comment:
   A better way is to implement `Spark3InternalRowReaderContext` and 
`Spark4InternalRowReaderContext` to differentiate the logic instead of using 
utils for redirection which is opaque.  It might be deferred to a subsequent 
PR, but I think it would be good to avoid exposure of `SparkSortUtils` which 
can be used by anyone accidentally, which logic should incorporated into 
version-specific implementation classes.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -260,7 +263,13 @@ object HoodieDatasetBulkInsertHelper
       .reduceByKey ((oneRow, otherRow) => {
         val onePreCombineVal = getNestedInternalRowValue(oneRow, 
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
         val otherPreCombineVal = getNestedInternalRowValue(otherRow, 
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
-        if 
(onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
+        if (onePreCombineVal.isInstanceOf[UTF8String]) {
+          if 
(sparkAdapter.compareUTF8String(onePreCombineVal.asInstanceOf[UTF8String], 
otherPreCombineVal.asInstanceOf[UTF8String]) >= 0) {
+            oneRow
+          } else {
+            otherRow
+          }
+        } else if 
(onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {

Review Comment:
   Precombine field type is predetermined.  Could we avoid per-record 
`UTF8String` type check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to