This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8c58ef5d78a [HUDI-7037] Fix colstats reading for Decimal field (#12993) 8c58ef5d78a is described below commit 8c58ef5d78a3d649006472674ae306144ea7f746 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Wed Mar 26 22:21:47 2025 +0530 [HUDI-7037] Fix colstats reading for Decimal field (#12993) --- .../org/apache/hudi/ColumnStatsIndexSupport.scala | 15 ++++--- .../hudi/functional/ColumnStatIndexTestBase.scala | 52 +++++++++++++++++++++- .../hudi/functional/TestColumnStatsIndex.scala | 27 ++++++++++- 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 8fe24432eed..ca87cac2a3e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -455,10 +455,10 @@ object ColumnStatsIndexSupport { case w: LongWrapper => w.getValue case w: FloatWrapper => w.getValue case w: DoubleWrapper => w.getValue + case w: DecimalWrapper => w.getValue // Moved above BytesWrapper to ensure proper matching case w: BytesWrapper => w.getValue case w: StringWrapper => w.getValue case w: DateWrapper => w.getValue - case w: DecimalWrapper => w.getValue case w: TimeMicrosWrapper => w.getValue case w: TimestampMicrosWrapper => w.getValue @@ -490,13 +490,18 @@ object ColumnStatsIndexSupport { case ShortType => value.asInstanceOf[Int].toShort case ByteType => value.asInstanceOf[Int].toByte - // TODO fix - case _: DecimalType => + case dt: DecimalType => value match { case buffer: ByteBuffer => val logicalType = DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType - decConv.fromBytes(buffer, null, logicalType) - case _ => value + decConv.fromBytes(buffer, null, logicalType).setScale(dt.scale, java.math.RoundingMode.UNNECESSARY) + case bd: BigDecimal => + // Scala BigDecimal: convert to java.math.BigDecimal and enforce the scale + bd.bigDecimal.setScale(dt.scale, java.math.RoundingMode.UNNECESSARY) + case bd: java.math.BigDecimal => + bd.setScale(dt.scale, java.math.RoundingMode.UNNECESSARY) + case other => + throw new UnsupportedOperationException(s"Cannot deserialize value for DecimalType: unexpected type ${other.getClass.getName}") } case BinaryType => value match { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala index 17449e8d098..c6d8d1ce293 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala @@ -21,6 +21,7 @@ package org.apache.hudi.functional import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, DataSourceWriteOptions, PartitionStatsIndexSupport} import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.avro.model.DecimalWrapper import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} import org.apache.hudi.common.model.{HoodieBaseFile, HoodieFileGroup, HoodieLogFile, HoodieTableType} @@ -46,7 +47,8 @@ import org.junit.jupiter.api._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.params.provider.Arguments -import java.math.BigInteger +import java.math.{BigDecimal => JBigDecimal, BigInteger} +import java.nio.ByteBuffer import java.sql.{Date, Timestamp} import java.util import java.util.List @@ -492,6 +494,54 @@ object ColumnStatIndexTestBase { : _*) } + trait WrapperCreator { + def create(orig: JBigDecimal, sch: Schema): DecimalWrapper + } + + // Test cases for column stats index with DecimalWrapper + def decimalWrapperTestCases: java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + // Test case 1: "ByteBuffer Test" – using an original JBigDecimal. + Arguments.of( + "ByteBuffer Test", + new JBigDecimal("123.45"), + new WrapperCreator { + override def create(orig: JBigDecimal, sch: Schema): DecimalWrapper = + new DecimalWrapper { + // Return a ByteBuffer computed via Avro's DecimalConversion.` + override def getValue: ByteBuffer = + ColumnStatsIndexSupport.decConv.toBytes(orig, sch, sch.getLogicalType) + } + } + ), + // Test case 2: "Java BigDecimal Test" – again using a JBigDecimal. + Arguments.of( + "Java BigDecimal Test", + new JBigDecimal("543.21"), + new WrapperCreator { + override def create(orig: JBigDecimal, sch: Schema): DecimalWrapper = + new DecimalWrapper { + override def getValue: ByteBuffer = + ColumnStatsIndexSupport.decConv.toBytes(orig, sch, sch.getLogicalType) + } + } + ), + // Test case 3: "Scala BigDecimal Test" – using a Scala BigDecimal converted to JBigDecimal. + Arguments.of( + "Scala BigDecimal Test", + scala.math.BigDecimal("987.65").bigDecimal, + new WrapperCreator { + override def create(orig: JBigDecimal, sch: Schema): DecimalWrapper = + new DecimalWrapper { + override def getValue: ByteBuffer = + // Here we explicitly use orig (which comes from Scala BigDecimal converted to java.math.BigDecimal) + ColumnStatsIndexSupport.decConv.toBytes(orig, sch, sch.getLogicalType) + } + } + ) + ) + } + case class ColumnStatsTestParams(testCase: ColumnStatsTestCase, metadataOpts: Map[String, String], hudiOpts: Map[String, String], diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 44ae5d05447..b7f2506426d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -22,6 +22,7 @@ import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.avro.model.DecimalWrapper import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieStorageConfig} import org.apache.hudi.common.fs.FSUtils @@ -34,13 +35,13 @@ import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERA import org.apache.hudi.common.util.ParquetUtils import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieWriteConfig} -import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase -import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams +import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams, WrapperCreator} import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS import org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS import org.apache.hudi.storage.StoragePath import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration +import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql._ @@ -53,6 +54,8 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, MethodSource} +import java.math.{BigDecimal => JBigDecimal} +import java.nio.ByteBuffer import java.util.Collections import java.util.stream.Collectors @@ -1075,4 +1078,24 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { assertTrue(r.getMinValue.asInstanceOf[Comparable[Object]].compareTo(r.getMaxValue.asInstanceOf[Object]) <= 0) }) } + + @ParameterizedTest + @MethodSource(Array("decimalWrapperTestCases")) + def testDeserialize(description: String, expected: JBigDecimal, wrapperCreator: WrapperCreator): Unit = { + val dt = DecimalType(10, 2) + // Get the schema from the DecimalWrapper's Avro definition. + val schema: Schema = DecimalWrapper.SCHEMA$.getField("value").schema() + val wrapper = wrapperCreator.create(expected, schema) + // Extract the underlying value. + val unwrapped = ColumnStatsIndexSupport.tryUnpackValueWrapper(wrapper) + // Optionally, for the "ByteBuffer Test" case, verify that the unwrapped value is a ByteBuffer. + if (description.contains("ByteBuffer Test")) { + assertTrue(unwrapped.isInstanceOf[ByteBuffer], "Expected a ByteBuffer") + } + // Deserialize into a java.math.BigDecimal. + val deserialized = ColumnStatsIndexSupport.deserialize(unwrapped, dt) + assertTrue(deserialized.isInstanceOf[JBigDecimal], "Deserialized value should be a java.math.BigDecimal") + assertEquals(expected, deserialized.asInstanceOf[JBigDecimal], + s"Decimal value from $description does not match") + } }