This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 014d7657214d46060ee699c88f6f847343d6851c Author: harshal <[email protected]> AuthorDate: Mon Jan 3 12:19:43 2022 +0530 [HUDI-2558] Fixing Clustering w/ sort columns with null values fails (#4404) --- .../RDDCustomColumnsSortPartitioner.java | 12 +++++++++++- .../apache/hudi/functional/TestMORDataSource.scala | 22 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index a36da5f..fb243f5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.BulkInsertPartitioner; @@ -55,8 +56,17 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload> final String[] sortColumns = this.sortColumnNames; final SerializableSchema schema = this.serializableSchema; return records.sortBy( - record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema), + record -> { + Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema); + // null values are replaced with empty string for null_first order + if (recordValue == null) { + return StringUtils.EMPTY_STRING; + } else { + return StringUtils.objToString(record); + } + }, true, outputSparkPartitions); + } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 5719ad7..9996f23 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -716,6 +716,28 @@ class TestMORDataSource extends HoodieClientTestBase { } @Test + def testClusteringOnNullableColumn(): Unit = { + val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + .withColumn("cluster_id", when(expr("end_lon < 0.2 "), lit(null).cast("string")) + .otherwise(col("_row_key"))) + .withColumn("struct_cluster_col", when(expr("end_lon < 0.1"), lit(null)) + .otherwise(struct(col("cluster_id"), col("_row_key")))) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + // option for clustering + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") + .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") + .option("hoodie.clustering.plan.strategy.sort.columns", "struct_cluster_col") + .mode(SaveMode.Overwrite) + .save(basePath) + } + + @Test def testHoodieIsDeletedMOR(): Unit = { val numRecords = 100 val numRecordsToDelete = 2
