yihua commented on code in PR #12772:
URL: https://github.com/apache/hudi/pull/12772#discussion_r2074139354
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -184,7 +185,7 @@ private void writeRow(InternalRow row) {
UTF8String writeCommitTime = shouldPreserveHoodieMetadata ?
row.getUTF8String(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD)
: commitTime;
- InternalRow updatedRow = new HoodieInternalRow(writeCommitTime, seqId,
recordKey,
+ InternalRow updatedRow =
SparkAdapterSupport$.MODULE$.sparkAdapter().createInternalRow(writeCommitTime,
seqId, recordKey,
Review Comment:
Have you checked if the performance is the same after replacing with
`HoodieInternalRow::new` with
`SparkAdapterSupport$.MODULE$.sparkAdapter().createInternalRow`.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java:
##########
@@ -62,9 +62,11 @@ public RDDCustomColumnsSortPartitioner(String[] columnNames,
Schema schema, Hood
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records,
int
outputSparkPartitions) {
- return records
- .sortBy(record -> SortUtils.getComparableSortColumns(record,
sortColumnNames, serializableSchema.get(), suffixRecordKey,
consistentLogicalTimestampEnabled),
- true, outputSparkPartitions);
+ return records.sortBy(record ->
+ SparkSortUtils.getComparableSortColumns(
Review Comment:
Could we still use `SortUtils.getComparableSortColumns`?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java:
##########
@@ -84,8 +85,8 @@ private JavaRDD<HoodieRecord<T>>
doPartitionAndCustomColumnSort(JavaRDD<HoodieRe
final String[] sortColumns = sortColumnNames;
final SerializableSchema schema = new
SerializableSchema(HoodieAvroUtils.addMetadataFields((new
Schema.Parser().parse(table.getConfig().getSchema()))));
Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> &
Serializable) (t1, t2) -> {
- FlatLists.ComparableList obj1 =
FlatLists.ofComparableArray(t1.getColumnValues(schema.get(), sortColumns,
consistentLogicalTimestampEnabled));
- FlatLists.ComparableList obj2 =
FlatLists.ofComparableArray(t2.getColumnValues(schema.get(), sortColumns,
consistentLogicalTimestampEnabled));
+ FlatLists.ComparableList obj1 =
SparkAdapterSupport$.MODULE$.sparkAdapter().createComparableList(t1.getColumnValues(schema.get(),
sortColumns, consistentLogicalTimestampEnabled));
+ FlatLists.ComparableList obj2 =
SparkAdapterSupport$.MODULE$.sparkAdapter().createComparableList(t2.getColumnValues(schema.get(),
sortColumns, consistentLogicalTimestampEnabled));
Review Comment:
Same here on reverting back to `FlatLists.ofComparableArray`.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -151,4 +152,10 @@ public Comparable convertValueToEngineType(Comparable
value) {
}
return value;
}
+
+ @Override
+ public int compareValues(Comparable a, Comparable b) {
+ // [SPARK-46832] UTF8String doesn't support compareTo anymore
+ return SparkSortUtils.compareValues(a, b);
Review Comment:
A better way is to implement `Spark3InternalRowReaderContext` and
`Spark4InternalRowReaderContext` to differentiate the logic instead of using
utils for redirection which is opaque. It might be deferred to a subsequent
PR, but I think it would be good to avoid exposure of `SparkSortUtils` which
can be used by anyone accidentally, which logic should incorporated into
version-specific implementation classes.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -260,7 +263,13 @@ object HoodieDatasetBulkInsertHelper
.reduceByKey ((oneRow, otherRow) => {
val onePreCombineVal = getNestedInternalRowValue(oneRow,
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
val otherPreCombineVal = getNestedInternalRowValue(otherRow,
preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
- if
(onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
+ if (onePreCombineVal.isInstanceOf[UTF8String]) {
+ if
(sparkAdapter.compareUTF8String(onePreCombineVal.asInstanceOf[UTF8String],
otherPreCombineVal.asInstanceOf[UTF8String]) >= 0) {
+ oneRow
+ } else {
+ otherRow
+ }
+ } else if
(onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
Review Comment:
Precombine field type is predetermined. Could we avoid per-record
`UTF8String` type check?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]