This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 014d7657214d46060ee699c88f6f847343d6851c
Author: harshal <[email protected]>
AuthorDate: Mon Jan 3 12:19:43 2022 +0530

    [HUDI-2558] Fixing Clustering w/ sort columns with null values fails (#4404)
---
 .../RDDCustomColumnsSortPartitioner.java           | 12 +++++++++++-
 .../apache/hudi/functional/TestMORDataSource.scala | 22 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index a36da5f..fb243f5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
@@ -55,8 +56,17 @@ public class RDDCustomColumnsSortPartitioner<T extends 
HoodieRecordPayload>
     final String[] sortColumns = this.sortColumnNames;
     final SerializableSchema schema = this.serializableSchema;
     return records.sortBy(
-        record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, 
schema),
+        record -> {
+          Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, 
sortColumns, schema);
+          // null values are replaced with empty string for null_first order
+          if (recordValue == null) {
+            return StringUtils.EMPTY_STRING;
+          } else {
+            return StringUtils.objToString(record);
+          }
+        },
         true, outputSparkPartitions);
+
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 5719ad7..9996f23 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -716,6 +716,28 @@ class TestMORDataSource extends HoodieClientTestBase {
   }
 
   @Test
+  def testClusteringOnNullableColumn(): Unit = {
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 
1000)).toList
+    val inputDF1: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records1, 2))
+      .withColumn("cluster_id", when(expr("end_lon < 0.2 "), 
lit(null).cast("string"))
+          .otherwise(col("_row_key")))
+      .withColumn("struct_cluster_col", when(expr("end_lon < 0.1"), lit(null))
+          .otherwise(struct(col("cluster_id"), col("_row_key"))))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      // option for clustering
+      .option("hoodie.clustering.inline", "true")
+      .option("hoodie.clustering.inline.max.commits", "1")
+      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
"1073741824")
+      .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
+      .option("hoodie.clustering.plan.strategy.sort.columns", 
"struct_cluster_col")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+  }
+
+  @Test
   def testHoodieIsDeletedMOR(): Unit =  {
     val numRecords = 100
     val numRecordsToDelete = 2

Reply via email to