This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c58ef5d78a [HUDI-7037] Fix colstats reading for Decimal field (#12993)
8c58ef5d78a is described below

commit 8c58ef5d78a3d649006472674ae306144ea7f746
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Wed Mar 26 22:21:47 2025 +0530

    [HUDI-7037] Fix colstats reading for Decimal field (#12993)
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  | 15 ++++---
 .../hudi/functional/ColumnStatIndexTestBase.scala  | 52 +++++++++++++++++++++-
 .../hudi/functional/TestColumnStatsIndex.scala     | 27 ++++++++++-
 3 files changed, 86 insertions(+), 8 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 8fe24432eed..ca87cac2a3e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -455,10 +455,10 @@ object ColumnStatsIndexSupport {
       case w: LongWrapper => w.getValue
       case w: FloatWrapper => w.getValue
       case w: DoubleWrapper => w.getValue
+      case w: DecimalWrapper => w.getValue  // Moved above BytesWrapper to 
ensure proper matching
       case w: BytesWrapper => w.getValue
       case w: StringWrapper => w.getValue
       case w: DateWrapper => w.getValue
-      case w: DecimalWrapper => w.getValue
       case w: TimeMicrosWrapper => w.getValue
       case w: TimestampMicrosWrapper => w.getValue
 
@@ -490,13 +490,18 @@ object ColumnStatsIndexSupport {
       case ShortType => value.asInstanceOf[Int].toShort
       case ByteType => value.asInstanceOf[Int].toByte
 
-      // TODO fix
-      case _: DecimalType =>
+      case dt: DecimalType =>
         value match {
           case buffer: ByteBuffer =>
             val logicalType = 
DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType
-            decConv.fromBytes(buffer, null, logicalType)
-          case _ => value
+            decConv.fromBytes(buffer, null, logicalType).setScale(dt.scale, 
java.math.RoundingMode.UNNECESSARY)
+          case bd: BigDecimal =>
+            // Scala BigDecimal: convert to java.math.BigDecimal and enforce 
the scale
+            bd.bigDecimal.setScale(dt.scale, 
java.math.RoundingMode.UNNECESSARY)
+          case bd: java.math.BigDecimal =>
+            bd.setScale(dt.scale, java.math.RoundingMode.UNNECESSARY)
+          case other =>
+            throw new UnsupportedOperationException(s"Cannot deserialize value 
for DecimalType: unexpected type ${other.getClass.getName}")
         }
       case BinaryType =>
         value match {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 17449e8d098..c6d8d1ce293 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -21,6 +21,7 @@ package org.apache.hudi.functional
 import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, 
DataSourceWriteOptions, PartitionStatsIndexSupport}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.avro.model.DecimalWrapper
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig}
 import org.apache.hudi.common.model.{HoodieBaseFile, HoodieFileGroup, 
HoodieLogFile, HoodieTableType}
@@ -46,7 +47,8 @@ import org.junit.jupiter.api._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.params.provider.Arguments
 
-import java.math.BigInteger
+import java.math.{BigDecimal => JBigDecimal, BigInteger}
+import java.nio.ByteBuffer
 import java.sql.{Date, Timestamp}
 import java.util
 import java.util.List
@@ -492,6 +494,54 @@ object ColumnStatIndexTestBase {
         : _*)
   }
 
+  trait WrapperCreator {
+    def create(orig: JBigDecimal, sch: Schema): DecimalWrapper
+  }
+
+  // Test cases for column stats index with DecimalWrapper
+  def decimalWrapperTestCases: java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      // Test case 1: "ByteBuffer Test" – using an original JBigDecimal.
+      Arguments.of(
+        "ByteBuffer Test",
+        new JBigDecimal("123.45"),
+        new WrapperCreator {
+          override def create(orig: JBigDecimal, sch: Schema): DecimalWrapper =
+            new DecimalWrapper {
+              // Return a ByteBuffer computed via Avro's DecimalConversion.`
+              override def getValue: ByteBuffer =
+                ColumnStatsIndexSupport.decConv.toBytes(orig, sch, 
sch.getLogicalType)
+            }
+        }
+      ),
+      // Test case 2: "Java BigDecimal Test" – again using a JBigDecimal.
+      Arguments.of(
+        "Java BigDecimal Test",
+        new JBigDecimal("543.21"),
+        new WrapperCreator {
+          override def create(orig: JBigDecimal, sch: Schema): DecimalWrapper =
+            new DecimalWrapper {
+              override def getValue: ByteBuffer =
+                ColumnStatsIndexSupport.decConv.toBytes(orig, sch, 
sch.getLogicalType)
+            }
+        }
+      ),
+      // Test case 3: "Scala BigDecimal Test" – using a Scala BigDecimal 
converted to JBigDecimal.
+      Arguments.of(
+        "Scala BigDecimal Test",
+        scala.math.BigDecimal("987.65").bigDecimal,
+        new WrapperCreator {
+          override def create(orig: JBigDecimal, sch: Schema): DecimalWrapper =
+            new DecimalWrapper {
+              override def getValue: ByteBuffer =
+                // Here we explicitly use orig (which comes from Scala 
BigDecimal converted to java.math.BigDecimal)
+                ColumnStatsIndexSupport.decConv.toBytes(orig, sch, 
sch.getLogicalType)
+            }
+        }
+      )
+    )
+  }
+
   case class ColumnStatsTestParams(testCase: ColumnStatsTestCase,
                                    metadataOpts: Map[String, String],
                                    hudiOpts: Map[String, String],
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 44ae5d05447..b7f2506426d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.{ColumnStatsIndexSupport, 
DataSourceWriteOptions}
 import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
 import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.avro.model.DecimalWrapper
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.fs.FSUtils
@@ -34,13 +35,13 @@ import 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERA
 import org.apache.hudi.common.util.ParquetUtils
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
-import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase
-import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams
+import 
org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, 
ColumnStatsTestParams, WrapperCreator}
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS
 import org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 
+import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
@@ -53,6 +54,8 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertNotNull, assertTrue
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
 
+import java.math.{BigDecimal => JBigDecimal}
+import java.nio.ByteBuffer
 import java.util.Collections
 import java.util.stream.Collectors
 
@@ -1075,4 +1078,24 @@ class TestColumnStatsIndex extends 
ColumnStatIndexTestBase {
       
assertTrue(r.getMinValue.asInstanceOf[Comparable[Object]].compareTo(r.getMaxValue.asInstanceOf[Object])
 <= 0)
     })
   }
+
+  @ParameterizedTest
+  @MethodSource(Array("decimalWrapperTestCases"))
+  def testDeserialize(description: String, expected: JBigDecimal, 
wrapperCreator: WrapperCreator): Unit = {
+    val dt = DecimalType(10, 2)
+    // Get the schema from the DecimalWrapper's Avro definition.
+    val schema: Schema = DecimalWrapper.SCHEMA$.getField("value").schema()
+    val wrapper = wrapperCreator.create(expected, schema)
+    // Extract the underlying value.
+    val unwrapped = ColumnStatsIndexSupport.tryUnpackValueWrapper(wrapper)
+    // Optionally, for the "ByteBuffer Test" case, verify that the unwrapped 
value is a ByteBuffer.
+    if (description.contains("ByteBuffer Test")) {
+      assertTrue(unwrapped.isInstanceOf[ByteBuffer], "Expected a ByteBuffer")
+    }
+    // Deserialize into a java.math.BigDecimal.
+    val deserialized = ColumnStatsIndexSupport.deserialize(unwrapped, dt)
+    assertTrue(deserialized.isInstanceOf[JBigDecimal], "Deserialized value 
should be a java.math.BigDecimal")
+    assertEquals(expected, deserialized.asInstanceOf[JBigDecimal],
+      s"Decimal value from $description does not match")
+  }
 }

Reply via email to