This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 077005c8c perf: [iceberg] Use protobuf instead of JSON to serialize 
Iceberg partition values (#3247)
077005c8c is described below

commit 077005c8c1a0043880081b698f38eed955a560a3
Author: Parth Chandra <[email protected]>
AuthorDate: Fri Jan 23 11:51:49 2026 -0800

    perf: [iceberg] Use protobuf instead of JSON to serialize Iceberg partition 
values (#3247)
    
    * perf: Use protobuf instead of JSON to serialize Iceberg partition values
---
 native/core/src/execution/planner.rs               | 168 ++++++----
 native/proto/src/proto/operator.proto              |  28 +-
 .../serde/operator/CometIcebergNativeScan.scala    | 344 +++++++++------------
 .../org/apache/comet/IcebergReadFromS3Suite.scala  |  37 +++
 4 files changed, 326 insertions(+), 251 deletions(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 500203e76..6a4ad97f8 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2629,6 +2629,103 @@ fn convert_spark_types_to_arrow_schema(
     arrow_schema
 }
 
+/// Converts a protobuf PartitionValue to an iceberg Literal.
+///
+fn partition_value_to_literal(
+    proto_value: &spark_operator::PartitionValue,
+) -> Result<Option<iceberg::spec::Literal>, ExecutionError> {
+    use spark_operator::partition_value::Value;
+
+    if proto_value.is_null {
+        return Ok(None);
+    }
+
+    let literal = match &proto_value.value {
+        Some(Value::IntVal(v)) => iceberg::spec::Literal::int(*v),
+        Some(Value::LongVal(v)) => iceberg::spec::Literal::long(*v),
+        Some(Value::DateVal(v)) => {
+            // Convert i64 to i32 for date (days since epoch)
+            let days = (*v)
+                .try_into()
+                .map_err(|_| GeneralError(format!("Date value out of range: 
{}", v)))?;
+            iceberg::spec::Literal::date(days)
+        }
+        Some(Value::TimestampVal(v)) => iceberg::spec::Literal::timestamp(*v),
+        Some(Value::TimestampTzVal(v)) => 
iceberg::spec::Literal::timestamptz(*v),
+        Some(Value::StringVal(s)) => iceberg::spec::Literal::string(s.clone()),
+        Some(Value::DoubleVal(v)) => iceberg::spec::Literal::double(*v),
+        Some(Value::FloatVal(v)) => iceberg::spec::Literal::float(*v),
+        Some(Value::DecimalVal(bytes)) => {
+            // Deserialize unscaled BigInteger bytes to i128
+            // BigInteger is serialized as signed big-endian bytes
+            if bytes.len() > 16 {
+                return Err(GeneralError(format!(
+                    "Decimal bytes too large: {} bytes (max 16 for i128)",
+                    bytes.len()
+                )));
+            }
+
+            // Convert big-endian bytes to i128
+            let mut buf = [0u8; 16];
+            let offset = 16 - bytes.len();
+            buf[offset..].copy_from_slice(bytes);
+
+            // Handle sign extension for negative numbers
+            let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 {
+                // Negative number - sign extend
+                for byte in buf.iter_mut().take(offset) {
+                    *byte = 0xFF;
+                }
+                i128::from_be_bytes(buf)
+            } else {
+                // Positive number
+                i128::from_be_bytes(buf)
+            };
+
+            iceberg::spec::Literal::decimal(value)
+        }
+        Some(Value::BoolVal(v)) => iceberg::spec::Literal::bool(*v),
+        Some(Value::UuidVal(bytes)) => {
+            // Deserialize UUID from 16 bytes
+            if bytes.len() != 16 {
+                return Err(GeneralError(format!(
+                    "Invalid UUID bytes length: {} (expected 16)",
+                    bytes.len()
+                )));
+            }
+            let uuid = uuid::Uuid::from_slice(bytes)
+                .map_err(|e| GeneralError(format!("Failed to parse UUID: {}", 
e)))?;
+            iceberg::spec::Literal::uuid(uuid)
+        }
+        Some(Value::FixedVal(bytes)) => 
iceberg::spec::Literal::fixed(bytes.to_vec()),
+        Some(Value::BinaryVal(bytes)) => 
iceberg::spec::Literal::binary(bytes.to_vec()),
+        None => {
+            return Err(GeneralError(
+                "PartitionValue has no value set and is_null is 
false".to_string(),
+            ));
+        }
+    };
+
+    Ok(Some(literal))
+}
+
+/// Converts a protobuf PartitionData to an iceberg Struct.
+///
+/// Uses the existing Struct::from_iter() API from iceberg-rust to construct 
the struct
+/// from the list of partition values.
+/// This can potentially be upstreamed to iceberg_rust
+fn partition_data_to_struct(
+    proto_partition: &spark_operator::PartitionData,
+) -> Result<iceberg::spec::Struct, ExecutionError> {
+    let literals: Vec<Option<iceberg::spec::Literal>> = proto_partition
+        .values
+        .iter()
+        .map(partition_value_to_literal)
+        .collect::<Result<Vec<_>, _>>()?;
+
+    Ok(iceberg::spec::Struct::from_iter(literals))
+}
+
 /// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask 
objects.
 ///
 /// Each task contains a residual predicate that is used for row-group level 
filtering
@@ -2655,19 +2752,6 @@ fn parse_file_scan_tasks(
         })
         .collect::<Result<Vec<_>, _>>()?;
 
-    let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
-        .partition_type_pool
-        .iter()
-        .map(|json| {
-            serde_json::from_str(json).map_err(|e| {
-                ExecutionError::GeneralError(format!(
-                    "Failed to parse partition type JSON from pool: {}",
-                    e
-                ))
-            })
-        })
-        .collect::<Result<Vec<_>, _>>()?;
-
     let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = 
proto_scan
         .partition_spec_pool
         .iter()
@@ -2721,19 +2805,7 @@ fn parse_file_scan_tasks(
         })
         .collect::<Result<Vec<_>, _>>()?;
 
-    let partition_data_cache: Vec<serde_json::Value> = proto_scan
-        .partition_data_pool
-        .iter()
-        .map(|json| {
-            serde_json::from_str(json).map_err(|e| {
-                ExecutionError::GeneralError(format!(
-                    "Failed to parse partition data JSON from pool: {}",
-                    e
-                ))
-            })
-        })
-        .collect::<Result<Vec<_>, _>>()?;
-
+    // Partition data pool is in protobuf messages
     let results: Result<Vec<_>, _> = proto_tasks
         .iter()
         .map(|proto_task| {
@@ -2787,48 +2859,24 @@ fn parse_file_scan_tasks(
             };
 
             let partition = if let Some(partition_data_idx) = 
proto_task.partition_data_idx {
-                let partition_type_idx = 
proto_task.partition_type_idx.ok_or_else(|| {
-                    ExecutionError::GeneralError(
-                        "partition_type_idx is required when 
partition_data_idx is present"
-                            .to_string(),
-                    )
-                })?;
-
-                let partition_data_value = partition_data_cache
+                // Get partition data from protobuf pool
+                let partition_data_proto = proto_scan
+                    .partition_data_pool
                     .get(partition_data_idx as usize)
                     .ok_or_else(|| {
                         ExecutionError::GeneralError(format!(
-                            "Invalid partition_data_idx: {} (cache size: {})",
+                            "Invalid partition_data_idx: {} (pool size: {})",
                             partition_data_idx,
-                            partition_data_cache.len()
+                            proto_scan.partition_data_pool.len()
                         ))
                     })?;
 
-                let partition_type = partition_type_cache
-                    .get(partition_type_idx as usize)
-                    .ok_or_else(|| {
-                        ExecutionError::GeneralError(format!(
-                            "Invalid partition_type_idx: {} (cache size: {})",
-                            partition_type_idx,
-                            partition_type_cache.len()
-                        ))
-                    })?;
-
-                match iceberg::spec::Literal::try_from_json(
-                    partition_data_value.clone(),
-                    &iceberg::spec::Type::Struct(partition_type.clone()),
-                ) {
-                    Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
-                    Ok(None) => None,
-                    Ok(other) => {
-                        return Err(GeneralError(format!(
-                            "Expected struct literal for partition data, got: 
{:?}",
-                            other
-                        )))
-                    }
+                // Convert protobuf PartitionData to iceberg Struct
+                match partition_data_to_struct(partition_data_proto) {
+                    Ok(s) => Some(s),
                     Err(e) => {
-                        return Err(GeneralError(format!(
-                            "Failed to deserialize partition data from JSON: 
{}",
+                        return Err(ExecutionError::GeneralError(format!(
+                            "Failed to deserialize partition data from 
protobuf: {}",
                             e
                         )))
                     }
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index c00b95396..73c087cf3 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -130,6 +130,32 @@ message CsvOptions {
   bool truncated_rows = 8;
 }
 
+// Partition value for Iceberg partition data
+message PartitionValue {
+  int32 field_id = 1;
+  oneof value {
+    int32 int_val = 2;
+    int64 long_val = 3;
+    int64 date_val = 4;           // days since epoch
+    int64 timestamp_val = 5;       // microseconds since epoch
+    int64 timestamp_tz_val = 6;    // microseconds with timezone
+    string string_val = 7;
+    double double_val = 8;
+    float float_val = 9;
+    bytes decimal_val = 10;        // unscaled BigInteger bytes
+    bool bool_val = 11;
+    bytes uuid_val = 12;
+    bytes fixed_val = 13;
+    bytes binary_val = 14;
+  }
+  bool is_null = 15;
+}
+
+// Collection of partition values for a single partition
+message PartitionData {
+  repeated PartitionValue values = 1;
+}
+
 message IcebergScan {
   // Schema to read
   repeated SparkStructField required_schema = 1;
@@ -149,7 +175,7 @@ message IcebergScan {
   repeated string partition_spec_pool = 7;
   repeated string name_mapping_pool = 8;
   repeated ProjectFieldIdList project_field_ids_pool = 9;
-  repeated string partition_data_pool = 10;
+  repeated PartitionData partition_data_pool = 10;
   repeated DeleteFileList delete_files_pool = 11;
   repeated spark.spark_expression.Expr residual_pool = 12;
 }
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index dc7df531f..7238f8ae8 100644
--- 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++ 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -22,7 +22,6 @@ package org.apache.comet.serde.operator
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
-import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
@@ -71,81 +70,133 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
   }
 
   /**
-   * Converts an Iceberg partition value to JSON format expected by 
iceberg-rust.
-   *
-   * iceberg-rust's Literal::try_from_json() expects specific formats for 
certain types:
-   *   - Timestamps: ISO string format "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
-   *   - Dates: ISO string format "YYYY-MM-DD"
-   *   - Decimals: String representation
-   *
-   * See: iceberg-rust/crates/iceberg/src/spec/values/literal.rs
+   * Converts an Iceberg partition value to protobuf format. Protobuf is less 
verbose than JSON.
+   * The following types are also serialized as integer values instead of as 
strings - Timestamps,
+   * Dates, Decimals, FieldIDs
    */
-  private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue = 
{
-    fieldTypeStr match {
-      case t if t.startsWith("timestamp") =>
-        val micros = value match {
-          case l: java.lang.Long => l.longValue()
-          case i: java.lang.Integer => i.longValue()
-          case _ => value.toString.toLong
-        }
-        val instant = java.time.Instant.ofEpochSecond(micros / 1000000, 
(micros % 1000000) * 1000)
-        val formatted = java.time.format.DateTimeFormatter
-          .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
-          .withZone(java.time.ZoneOffset.UTC)
-          .format(instant)
-        JString(formatted)
-
-      case "date" =>
-        val days = value.asInstanceOf[java.lang.Integer].intValue()
-        val localDate = java.time.LocalDate.ofEpochDay(days.toLong)
-        JString(localDate.toString)
-
-      case d if d.startsWith("decimal(") =>
-        JString(value.toString)
-
-      case "string" =>
-        JString(value.toString)
-
-      case "int" | "long" =>
-        value match {
-          case i: java.lang.Integer => JInt(BigInt(i.intValue()))
-          case l: java.lang.Long => JInt(BigInt(l.longValue()))
-          case _ => JDecimal(BigDecimal(value.toString))
-        }
+  private def partitionValueToProto(
+      fieldId: Int,
+      fieldTypeStr: String,
+      value: Any): OperatorOuterClass.PartitionValue = {
+    val builder = OperatorOuterClass.PartitionValue.newBuilder()
+    builder.setFieldId(fieldId)
+
+    if (value == null) {
+      builder.setIsNull(true)
+    } else {
+      builder.setIsNull(false)
+      fieldTypeStr match {
+        case t if t.startsWith("timestamp") =>
+          val micros = value match {
+            case l: java.lang.Long => l.longValue()
+            case i: java.lang.Integer => i.longValue()
+            case _ => value.toString.toLong
+          }
+          if (t.contains("tz")) {
+            builder.setTimestampTzVal(micros)
+          } else {
+            builder.setTimestampVal(micros)
+          }
 
-      case "float" | "double" =>
-        value match {
-          // NaN/Infinity are not valid JSON numbers - serialize as strings
-          case f: java.lang.Float if f.isNaN || f.isInfinite =>
-            JString(f.toString)
-          case d: java.lang.Double if d.isNaN || d.isInfinite =>
-            JString(d.toString)
-          case f: java.lang.Float => JDouble(f.doubleValue())
-          case d: java.lang.Double => JDouble(d.doubleValue())
-          case _ => JDecimal(BigDecimal(value.toString))
-        }
+        case "date" =>
+          val days = value.asInstanceOf[java.lang.Integer].intValue()
+          builder.setDateVal(days)
 
-      case "boolean" =>
-        value match {
-          case b: java.lang.Boolean => JBool(b.booleanValue())
-          case _ => JBool(value.toString.toBoolean)
-        }
+        case d if d.startsWith("decimal(") =>
+          // Serialize as unscaled BigInteger bytes
+          val bigDecimal = value match {
+            case bd: java.math.BigDecimal => bd
+            case _ => new java.math.BigDecimal(value.toString)
+          }
+          val unscaledBytes = bigDecimal.unscaledValue().toByteArray
+          
builder.setDecimalVal(com.google.protobuf.ByteString.copyFrom(unscaledBytes))
 
-      case "uuid" =>
-        JString(value.toString)
-
-      // Fallback: infer JSON type from Java type
-      case _ =>
-        value match {
-          case s: String => JString(s)
-          case i: java.lang.Integer => JInt(BigInt(i.intValue()))
-          case l: java.lang.Long => JInt(BigInt(l.longValue()))
-          case d: java.lang.Double => JDouble(d.doubleValue())
-          case f: java.lang.Float => JDouble(f.doubleValue())
-          case b: java.lang.Boolean => JBool(b.booleanValue())
-          case other => JString(other.toString)
-        }
+        case "string" =>
+          builder.setStringVal(value.toString)
+
+        case "int" =>
+          val intVal = value match {
+            case i: java.lang.Integer => i.intValue()
+            case l: java.lang.Long => l.intValue()
+            case _ => value.toString.toInt
+          }
+          builder.setIntVal(intVal)
+
+        case "long" =>
+          val longVal = value match {
+            case l: java.lang.Long => l.longValue()
+            case i: java.lang.Integer => i.longValue()
+            case _ => value.toString.toLong
+          }
+          builder.setLongVal(longVal)
+
+        case "float" =>
+          val floatVal = value match {
+            case f: java.lang.Float => f.floatValue()
+            case d: java.lang.Double => d.floatValue()
+            case _ => value.toString.toFloat
+          }
+          builder.setFloatVal(floatVal)
+
+        case "double" =>
+          val doubleVal = value match {
+            case d: java.lang.Double => d.doubleValue()
+            case f: java.lang.Float => f.doubleValue()
+            case _ => value.toString.toDouble
+          }
+          builder.setDoubleVal(doubleVal)
+
+        case "boolean" =>
+          val boolVal = value match {
+            case b: java.lang.Boolean => b.booleanValue()
+            case _ => value.toString.toBoolean
+          }
+          builder.setBoolVal(boolVal)
+
+        case "uuid" =>
+          // UUID as bytes (16 bytes) or string
+          val uuidBytes = value match {
+            case uuid: java.util.UUID =>
+              val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16))
+              bb.putLong(uuid.getMostSignificantBits)
+              bb.putLong(uuid.getLeastSignificantBits)
+              bb.array()
+            case _ =>
+              // Parse UUID string and convert to bytes
+              val uuid = java.util.UUID.fromString(value.toString)
+              val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16))
+              bb.putLong(uuid.getMostSignificantBits)
+              bb.putLong(uuid.getLeastSignificantBits)
+              bb.array()
+          }
+          
builder.setUuidVal(com.google.protobuf.ByteString.copyFrom(uuidBytes))
+
+        case t if t.startsWith("fixed[") || t.startsWith("binary") =>
+          val bytes = value match {
+            case bytes: Array[Byte] => bytes
+            case _ => value.toString.getBytes("UTF-8")
+          }
+          if (t.startsWith("fixed")) {
+            builder.setFixedVal(com.google.protobuf.ByteString.copyFrom(bytes))
+          } else {
+            
builder.setBinaryVal(com.google.protobuf.ByteString.copyFrom(bytes))
+          }
+
+        // Fallback: infer type from Java type ?
+        case _ =>
+          value match {
+            case s: String => builder.setStringVal(s)
+            case i: java.lang.Integer => builder.setIntVal(i.intValue())
+            case l: java.lang.Long => builder.setLongVal(l.longValue())
+            case d: java.lang.Double => builder.setDoubleVal(d.doubleValue())
+            case f: java.lang.Float => builder.setFloatVal(f.floatValue())
+            case b: java.lang.Boolean => builder.setBoolVal(b.booleanValue())
+            case other => builder.setStringVal(other.toString)
+          }
+      }
     }
+
+    builder.build()
   }
 
   /**
@@ -375,18 +426,17 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
             }
           }
 
-          // Serialize partition data to JSON for iceberg-rust's constants_map.
-          // The native execution engine uses partition_data_json +
-          // partition_type_json to build a constants_map, which is the primary
-          // mechanism for providing partition values to identity-transformed
-          // partition columns. Non-identity transforms (bucket, truncate, 
days,
-          // etc.) read values from data files.
+          // Serialize partition data to protobuf for native execution.
+          // The native execution engine uses partition_data protobuf messages 
to
+          // build a constants_map, which provides partition values to 
identity-
+          // transformed partition columns. Non-identity transforms (bucket, 
truncate,
+          // days, etc.) read values from data files.
           //
-          // IMPORTANT: Use the same field IDs as partition_type_json 
(partition field IDs,
-          // not source field IDs) so that JSON deserialization matches 
correctly.
+          // IMPORTANT: Use partition field IDs (not source field IDs) to match
+          // the schema.
 
           // Filter out fields with unknown type (same as partition type 
filtering)
-          val partitionDataMap: Map[String, JValue] =
+          val partitionValues: Seq[OperatorOuterClass.PartitionValue] =
             fields.asScala.zipWithIndex.flatMap { case (field, idx) =>
               val fieldTypeStr = getFieldType(field)
 
@@ -402,23 +452,25 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
                   partitionData.getClass.getMethod("get", classOf[Int], 
classOf[Class[_]])
                 val value = getMethod.invoke(partitionData, 
Integer.valueOf(idx), classOf[Object])
 
-                val jsonValue = if (value == null) {
-                  JNull
-                } else {
-                  partitionValueToJson(fieldTypeStr, value)
-                }
-                Some(fieldId.toString -> jsonValue)
+                Some(partitionValueToProto(fieldId, fieldTypeStr, value))
               }
-            }.toMap
+            }.toSeq
 
           // Only serialize partition data if we have non-unknown fields
-          if (partitionDataMap.nonEmpty) {
-            val partitionJson = 
compact(render(JObject(partitionDataMap.toList)))
+          if (partitionValues.nonEmpty) {
+            val partitionDataProto = OperatorOuterClass.PartitionData
+              .newBuilder()
+              .addAllValues(partitionValues.asJava)
+              .build()
+
+            // Deduplicate by protobuf bytes (use Base64 string as key)
+            val partitionDataBytes = partitionDataProto.toByteArray
+            val partitionDataKey = 
java.util.Base64.getEncoder.encodeToString(partitionDataBytes)
 
             val partitionDataIdx = partitionDataToPoolIndex.getOrElseUpdate(
-              partitionJson, {
+              partitionDataKey, {
                 val idx = partitionDataToPoolIndex.size
-                icebergScanBuilder.addPartitionDataPool(partitionJson)
+                icebergScanBuilder.addPartitionDataPool(partitionDataProto)
                 idx
               })
             taskBuilder.setPartitionDataIdx(partitionDataIdx)
@@ -637,7 +689,7 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
     val partitionSpecToPoolIndex = mutable.HashMap[String, Int]()
     val nameMappingToPoolIndex = mutable.HashMap[String, Int]()
     val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]()
-    val partitionDataToPoolIndex = mutable.HashMap[String, Int]()
+    val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64 
bytes -> pool index
     val deleteFilesToPoolIndex =
       mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]()
     val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]()
@@ -727,104 +779,6 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
                         throw new RuntimeException(msg)
                     }
 
-                    // Extract partition values for Hive-style partitioning
-                    var partitionJsonOpt: Option[String] = None
-                    try {
-                      val partitionMethod = 
contentFileClass.getMethod("partition")
-                      val partitionStruct = partitionMethod.invoke(dataFile)
-
-                      if (partitionStruct != null) {
-                        // scalastyle:off classforname
-                        val structLikeClass =
-                          
Class.forName(IcebergReflection.ClassNames.STRUCT_LIKE)
-                        // scalastyle:on classforname
-                        val sizeMethod = structLikeClass.getMethod("size")
-                        val getMethod =
-                          structLikeClass.getMethod("get", classOf[Int], 
classOf[Class[_]])
-
-                        val partitionSize =
-                          sizeMethod.invoke(partitionStruct).asInstanceOf[Int]
-
-                        if (partitionSize > 0) {
-                          // Get the partition spec directly from the task
-                          // scalastyle:off classforname
-                          val partitionScanTaskClass =
-                            
Class.forName(IcebergReflection.ClassNames.PARTITION_SCAN_TASK)
-                          // scalastyle:on classforname
-                          val specMethod = 
partitionScanTaskClass.getMethod("spec")
-                          val partitionSpec = specMethod.invoke(task)
-
-                          // Build JSON representation of partition values 
using json4s
-
-                          val partitionMap = 
scala.collection.mutable.Map[String, JValue]()
-
-                          if (partitionSpec != null) {
-                            // Get the list of partition fields from the spec
-                            val fieldsMethod = 
partitionSpec.getClass.getMethod("fields")
-                            val fields = fieldsMethod
-                              .invoke(partitionSpec)
-                              .asInstanceOf[java.util.List[_]]
-
-                            for (i <- 0 until partitionSize) {
-                              val value =
-                                getMethod.invoke(partitionStruct, Int.box(i), 
classOf[Object])
-
-                              // Get the partition field and check its 
transform type
-                              val partitionField = fields.get(i)
-
-                              // Only inject partition values for IDENTITY 
transforms
-                              val transformMethod =
-                                partitionField.getClass.getMethod("transform")
-                              val transform = 
transformMethod.invoke(partitionField)
-                              val isIdentity =
-                                transform.toString == 
IcebergReflection.Transforms.IDENTITY
-
-                              if (isIdentity) {
-                                // Get the source field ID
-                                val sourceIdMethod =
-                                  partitionField.getClass.getMethod("sourceId")
-                                val sourceFieldId =
-                                  
sourceIdMethod.invoke(partitionField).asInstanceOf[Int]
-
-                                val jsonValue = if (value == null) {
-                                  JNull
-                                } else {
-                                  // Get field type from schema to serialize 
correctly
-                                  val fieldTypeStr =
-                                    try {
-                                      val findFieldMethod =
-                                        metadata.tableSchema.getClass
-                                          .getMethod("findField", classOf[Int])
-                                      val field = findFieldMethod.invoke(
-                                        metadata.tableSchema,
-                                        sourceFieldId.asInstanceOf[Object])
-                                      if (field != null) {
-                                        val typeMethod = 
field.getClass.getMethod("type")
-                                        typeMethod.invoke(field).toString
-                                      } else {
-                                        "unknown"
-                                      }
-                                    } catch {
-                                      case _: Exception => "unknown"
-                                    }
-
-                                  partitionValueToJson(fieldTypeStr, value)
-                                }
-                                partitionMap(sourceFieldId.toString) = 
jsonValue
-                              }
-                            }
-                          }
-
-                          val partitionJson = 
compact(render(JObject(partitionMap.toList)))
-                          partitionJsonOpt = Some(partitionJson)
-                        }
-                      }
-                    } catch {
-                      case e: Exception =>
-                        logWarning(
-                          s"Failed to extract partition values from DataFile: 
${e.getMessage}")
-                    }
-
                     val startMethod = contentScanTaskClass.getMethod("start")
                     val start = startMethod.invoke(task).asInstanceOf[Long]
                     taskBuilder.setStart(start)
@@ -1056,7 +1010,17 @@ object CometIcebergNativeScan extends 
CometOperatorSerde[CometBatchScanExec] wit
       }
     }
 
+    // Calculate partition data pool size in bytes (protobuf format)
+    val partitionDataPoolBytes = 
icebergScanBuilder.getPartitionDataPoolList.asScala
+      .map(_.getSerializedSize)
+      .sum
+
     logInfo(s"IcebergScan: $totalTasks tasks, ${allPoolSizes.size} pools 
($avgDedup% avg dedup)")
+    if (partitionDataToPoolIndex.nonEmpty) {
+      logInfo(
+        s"  Partition data pool: ${partitionDataToPoolIndex.size} unique 
values, " +
+          s"$partitionDataPoolBytes bytes (protobuf)")
+    }
 
     builder.clearChildren()
     Some(builder.setIcebergScan(icebergScanBuilder).build())
diff --git a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala 
b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
index c8d360ae5..00955e629 100644
--- a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
+++ b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
@@ -163,6 +163,43 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
     }
   }
 
+  test("large scale partitioned table - 100 partitions with many files") {
+    assume(icebergAvailable, "Iceberg not available in classpath")
+
+    withSQLConf(
+      "spark.sql.files.maxRecordsPerFile" -> "50",
+      "spark.sql.adaptive.enabled" -> "false") {
+      spark.sql("""
+        CREATE TABLE s3_catalog.db.large_partitioned_test (
+          id INT,
+          data STRING,
+          partition_id INT
+        ) USING iceberg
+        PARTITIONED BY (partition_id)
+      """)
+
+      spark.sql("""
+        INSERT INTO s3_catalog.db.large_partitioned_test
+        SELECT
+          id,
+          CONCAT('data_', CAST(id AS STRING)) as data,
+          (id % 100) as partition_id
+        FROM range(500000)
+      """)
+
+      checkIcebergNativeScan(
+        "SELECT COUNT(DISTINCT id) FROM s3_catalog.db.large_partitioned_test")
+      checkIcebergNativeScan(
+        "SELECT * FROM s3_catalog.db.large_partitioned_test WHERE id < 10 
ORDER BY id")
+      checkIcebergNativeScan(
+        "SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE 
partition_id = 0")
+      checkIcebergNativeScan(
+        "SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE 
partition_id IN (0, 50, 99)")
+
+      spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test PURGE")
+    }
+  }
+
   test("MOR table with deletes in S3") {
     assume(icebergAvailable, "Iceberg not available in classpath")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to