This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 077005c8c perf: [iceberg] Use protobuf instead of JSON to serialize
Iceberg partition values (#3247)
077005c8c is described below
commit 077005c8c1a0043880081b698f38eed955a560a3
Author: Parth Chandra <[email protected]>
AuthorDate: Fri Jan 23 11:51:49 2026 -0800
perf: [iceberg] Use protobuf instead of JSON to serialize Iceberg partition
values (#3247)
* perf: Use protobuf instead of JSON to serialize Iceberg partition values
---
native/core/src/execution/planner.rs | 168 ++++++----
native/proto/src/proto/operator.proto | 28 +-
.../serde/operator/CometIcebergNativeScan.scala | 344 +++++++++------------
.../org/apache/comet/IcebergReadFromS3Suite.scala | 37 +++
4 files changed, 326 insertions(+), 251 deletions(-)
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 500203e76..6a4ad97f8 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2629,6 +2629,103 @@ fn convert_spark_types_to_arrow_schema(
arrow_schema
}
+/// Converts a protobuf PartitionValue to an iceberg Literal.
+///
+fn partition_value_to_literal(
+ proto_value: &spark_operator::PartitionValue,
+) -> Result<Option<iceberg::spec::Literal>, ExecutionError> {
+ use spark_operator::partition_value::Value;
+
+ if proto_value.is_null {
+ return Ok(None);
+ }
+
+ let literal = match &proto_value.value {
+ Some(Value::IntVal(v)) => iceberg::spec::Literal::int(*v),
+ Some(Value::LongVal(v)) => iceberg::spec::Literal::long(*v),
+ Some(Value::DateVal(v)) => {
+ // Convert i64 to i32 for date (days since epoch)
+ let days = (*v)
+ .try_into()
+ .map_err(|_| GeneralError(format!("Date value out of range:
{}", v)))?;
+ iceberg::spec::Literal::date(days)
+ }
+ Some(Value::TimestampVal(v)) => iceberg::spec::Literal::timestamp(*v),
+ Some(Value::TimestampTzVal(v)) =>
iceberg::spec::Literal::timestamptz(*v),
+ Some(Value::StringVal(s)) => iceberg::spec::Literal::string(s.clone()),
+ Some(Value::DoubleVal(v)) => iceberg::spec::Literal::double(*v),
+ Some(Value::FloatVal(v)) => iceberg::spec::Literal::float(*v),
+ Some(Value::DecimalVal(bytes)) => {
+ // Deserialize unscaled BigInteger bytes to i128
+ // BigInteger is serialized as signed big-endian bytes
+ if bytes.len() > 16 {
+ return Err(GeneralError(format!(
+ "Decimal bytes too large: {} bytes (max 16 for i128)",
+ bytes.len()
+ )));
+ }
+
+ // Convert big-endian bytes to i128
+ let mut buf = [0u8; 16];
+ let offset = 16 - bytes.len();
+ buf[offset..].copy_from_slice(bytes);
+
+ // Handle sign extension for negative numbers
+ let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 {
+ // Negative number - sign extend
+ for byte in buf.iter_mut().take(offset) {
+ *byte = 0xFF;
+ }
+ i128::from_be_bytes(buf)
+ } else {
+ // Positive number
+ i128::from_be_bytes(buf)
+ };
+
+ iceberg::spec::Literal::decimal(value)
+ }
+ Some(Value::BoolVal(v)) => iceberg::spec::Literal::bool(*v),
+ Some(Value::UuidVal(bytes)) => {
+ // Deserialize UUID from 16 bytes
+ if bytes.len() != 16 {
+ return Err(GeneralError(format!(
+ "Invalid UUID bytes length: {} (expected 16)",
+ bytes.len()
+ )));
+ }
+ let uuid = uuid::Uuid::from_slice(bytes)
+ .map_err(|e| GeneralError(format!("Failed to parse UUID: {}",
e)))?;
+ iceberg::spec::Literal::uuid(uuid)
+ }
+ Some(Value::FixedVal(bytes)) =>
iceberg::spec::Literal::fixed(bytes.to_vec()),
+ Some(Value::BinaryVal(bytes)) =>
iceberg::spec::Literal::binary(bytes.to_vec()),
+ None => {
+ return Err(GeneralError(
+ "PartitionValue has no value set and is_null is
false".to_string(),
+ ));
+ }
+ };
+
+ Ok(Some(literal))
+}
+
+/// Converts a protobuf PartitionData to an iceberg Struct.
+///
+/// Uses the existing Struct::from_iter() API from iceberg-rust to construct
the struct
+/// from the list of partition values.
+/// This can potentially be upstreamed to iceberg_rust
+fn partition_data_to_struct(
+ proto_partition: &spark_operator::PartitionData,
+) -> Result<iceberg::spec::Struct, ExecutionError> {
+ let literals: Vec<Option<iceberg::spec::Literal>> = proto_partition
+ .values
+ .iter()
+ .map(partition_value_to_literal)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ Ok(iceberg::spec::Struct::from_iter(literals))
+}
+
/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask
objects.
///
/// Each task contains a residual predicate that is used for row-group level
filtering
@@ -2655,19 +2752,6 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;
- let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
- .partition_type_pool
- .iter()
- .map(|json| {
- serde_json::from_str(json).map_err(|e| {
- ExecutionError::GeneralError(format!(
- "Failed to parse partition type JSON from pool: {}",
- e
- ))
- })
- })
- .collect::<Result<Vec<_>, _>>()?;
-
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> =
proto_scan
.partition_spec_pool
.iter()
@@ -2721,19 +2805,7 @@ fn parse_file_scan_tasks(
})
.collect::<Result<Vec<_>, _>>()?;
- let partition_data_cache: Vec<serde_json::Value> = proto_scan
- .partition_data_pool
- .iter()
- .map(|json| {
- serde_json::from_str(json).map_err(|e| {
- ExecutionError::GeneralError(format!(
- "Failed to parse partition data JSON from pool: {}",
- e
- ))
- })
- })
- .collect::<Result<Vec<_>, _>>()?;
-
+ // Partition data pool is in protobuf messages
let results: Result<Vec<_>, _> = proto_tasks
.iter()
.map(|proto_task| {
@@ -2787,48 +2859,24 @@ fn parse_file_scan_tasks(
};
let partition = if let Some(partition_data_idx) =
proto_task.partition_data_idx {
- let partition_type_idx =
proto_task.partition_type_idx.ok_or_else(|| {
- ExecutionError::GeneralError(
- "partition_type_idx is required when
partition_data_idx is present"
- .to_string(),
- )
- })?;
-
- let partition_data_value = partition_data_cache
+ // Get partition data from protobuf pool
+ let partition_data_proto = proto_scan
+ .partition_data_pool
.get(partition_data_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
- "Invalid partition_data_idx: {} (cache size: {})",
+ "Invalid partition_data_idx: {} (pool size: {})",
partition_data_idx,
- partition_data_cache.len()
+ proto_scan.partition_data_pool.len()
))
})?;
- let partition_type = partition_type_cache
- .get(partition_type_idx as usize)
- .ok_or_else(|| {
- ExecutionError::GeneralError(format!(
- "Invalid partition_type_idx: {} (cache size: {})",
- partition_type_idx,
- partition_type_cache.len()
- ))
- })?;
-
- match iceberg::spec::Literal::try_from_json(
- partition_data_value.clone(),
- &iceberg::spec::Type::Struct(partition_type.clone()),
- ) {
- Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
- Ok(None) => None,
- Ok(other) => {
- return Err(GeneralError(format!(
- "Expected struct literal for partition data, got:
{:?}",
- other
- )))
- }
+ // Convert protobuf PartitionData to iceberg Struct
+ match partition_data_to_struct(partition_data_proto) {
+ Ok(s) => Some(s),
Err(e) => {
- return Err(GeneralError(format!(
- "Failed to deserialize partition data from JSON:
{}",
+ return Err(ExecutionError::GeneralError(format!(
+ "Failed to deserialize partition data from
protobuf: {}",
e
)))
}
diff --git a/native/proto/src/proto/operator.proto
b/native/proto/src/proto/operator.proto
index c00b95396..73c087cf3 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -130,6 +130,32 @@ message CsvOptions {
bool truncated_rows = 8;
}
+// Partition value for Iceberg partition data
+message PartitionValue {
+ int32 field_id = 1;
+ oneof value {
+ int32 int_val = 2;
+ int64 long_val = 3;
+ int64 date_val = 4; // days since epoch
+ int64 timestamp_val = 5; // microseconds since epoch
+ int64 timestamp_tz_val = 6; // microseconds with timezone
+ string string_val = 7;
+ double double_val = 8;
+ float float_val = 9;
+ bytes decimal_val = 10; // unscaled BigInteger bytes
+ bool bool_val = 11;
+ bytes uuid_val = 12;
+ bytes fixed_val = 13;
+ bytes binary_val = 14;
+ }
+ bool is_null = 15;
+}
+
+// Collection of partition values for a single partition
+message PartitionData {
+ repeated PartitionValue values = 1;
+}
+
message IcebergScan {
// Schema to read
repeated SparkStructField required_schema = 1;
@@ -149,7 +175,7 @@ message IcebergScan {
repeated string partition_spec_pool = 7;
repeated string name_mapping_pool = 8;
repeated ProjectFieldIdList project_field_ids_pool = 9;
- repeated string partition_data_pool = 10;
+ repeated PartitionData partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
}
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
index dc7df531f..7238f8ae8 100644
---
a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
+++
b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala
@@ -22,7 +22,6 @@ package org.apache.comet.serde.operator
import scala.collection.mutable
import scala.jdk.CollectionConverters._
-import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
@@ -71,81 +70,133 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
/**
- * Converts an Iceberg partition value to JSON format expected by
iceberg-rust.
- *
- * iceberg-rust's Literal::try_from_json() expects specific formats for
certain types:
- * - Timestamps: ISO string format "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
- * - Dates: ISO string format "YYYY-MM-DD"
- * - Decimals: String representation
- *
- * See: iceberg-rust/crates/iceberg/src/spec/values/literal.rs
+ * Converts an Iceberg partition value to protobuf format. Protobuf is less
verbose than JSON.
+ * The following types are also serialized as integer values instead of as
strings - Timestamps,
+ * Dates, Decimals, FieldIDs
*/
- private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue =
{
- fieldTypeStr match {
- case t if t.startsWith("timestamp") =>
- val micros = value match {
- case l: java.lang.Long => l.longValue()
- case i: java.lang.Integer => i.longValue()
- case _ => value.toString.toLong
- }
- val instant = java.time.Instant.ofEpochSecond(micros / 1000000,
(micros % 1000000) * 1000)
- val formatted = java.time.format.DateTimeFormatter
- .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS")
- .withZone(java.time.ZoneOffset.UTC)
- .format(instant)
- JString(formatted)
-
- case "date" =>
- val days = value.asInstanceOf[java.lang.Integer].intValue()
- val localDate = java.time.LocalDate.ofEpochDay(days.toLong)
- JString(localDate.toString)
-
- case d if d.startsWith("decimal(") =>
- JString(value.toString)
-
- case "string" =>
- JString(value.toString)
-
- case "int" | "long" =>
- value match {
- case i: java.lang.Integer => JInt(BigInt(i.intValue()))
- case l: java.lang.Long => JInt(BigInt(l.longValue()))
- case _ => JDecimal(BigDecimal(value.toString))
- }
+ private def partitionValueToProto(
+ fieldId: Int,
+ fieldTypeStr: String,
+ value: Any): OperatorOuterClass.PartitionValue = {
+ val builder = OperatorOuterClass.PartitionValue.newBuilder()
+ builder.setFieldId(fieldId)
+
+ if (value == null) {
+ builder.setIsNull(true)
+ } else {
+ builder.setIsNull(false)
+ fieldTypeStr match {
+ case t if t.startsWith("timestamp") =>
+ val micros = value match {
+ case l: java.lang.Long => l.longValue()
+ case i: java.lang.Integer => i.longValue()
+ case _ => value.toString.toLong
+ }
+ if (t.contains("tz")) {
+ builder.setTimestampTzVal(micros)
+ } else {
+ builder.setTimestampVal(micros)
+ }
- case "float" | "double" =>
- value match {
- // NaN/Infinity are not valid JSON numbers - serialize as strings
- case f: java.lang.Float if f.isNaN || f.isInfinite =>
- JString(f.toString)
- case d: java.lang.Double if d.isNaN || d.isInfinite =>
- JString(d.toString)
- case f: java.lang.Float => JDouble(f.doubleValue())
- case d: java.lang.Double => JDouble(d.doubleValue())
- case _ => JDecimal(BigDecimal(value.toString))
- }
+ case "date" =>
+ val days = value.asInstanceOf[java.lang.Integer].intValue()
+ builder.setDateVal(days)
- case "boolean" =>
- value match {
- case b: java.lang.Boolean => JBool(b.booleanValue())
- case _ => JBool(value.toString.toBoolean)
- }
+ case d if d.startsWith("decimal(") =>
+ // Serialize as unscaled BigInteger bytes
+ val bigDecimal = value match {
+ case bd: java.math.BigDecimal => bd
+ case _ => new java.math.BigDecimal(value.toString)
+ }
+ val unscaledBytes = bigDecimal.unscaledValue().toByteArray
+
builder.setDecimalVal(com.google.protobuf.ByteString.copyFrom(unscaledBytes))
- case "uuid" =>
- JString(value.toString)
-
- // Fallback: infer JSON type from Java type
- case _ =>
- value match {
- case s: String => JString(s)
- case i: java.lang.Integer => JInt(BigInt(i.intValue()))
- case l: java.lang.Long => JInt(BigInt(l.longValue()))
- case d: java.lang.Double => JDouble(d.doubleValue())
- case f: java.lang.Float => JDouble(f.doubleValue())
- case b: java.lang.Boolean => JBool(b.booleanValue())
- case other => JString(other.toString)
- }
+ case "string" =>
+ builder.setStringVal(value.toString)
+
+ case "int" =>
+ val intVal = value match {
+ case i: java.lang.Integer => i.intValue()
+ case l: java.lang.Long => l.intValue()
+ case _ => value.toString.toInt
+ }
+ builder.setIntVal(intVal)
+
+ case "long" =>
+ val longVal = value match {
+ case l: java.lang.Long => l.longValue()
+ case i: java.lang.Integer => i.longValue()
+ case _ => value.toString.toLong
+ }
+ builder.setLongVal(longVal)
+
+ case "float" =>
+ val floatVal = value match {
+ case f: java.lang.Float => f.floatValue()
+ case d: java.lang.Double => d.floatValue()
+ case _ => value.toString.toFloat
+ }
+ builder.setFloatVal(floatVal)
+
+ case "double" =>
+ val doubleVal = value match {
+ case d: java.lang.Double => d.doubleValue()
+ case f: java.lang.Float => f.doubleValue()
+ case _ => value.toString.toDouble
+ }
+ builder.setDoubleVal(doubleVal)
+
+ case "boolean" =>
+ val boolVal = value match {
+ case b: java.lang.Boolean => b.booleanValue()
+ case _ => value.toString.toBoolean
+ }
+ builder.setBoolVal(boolVal)
+
+ case "uuid" =>
+ // UUID as bytes (16 bytes) or string
+ val uuidBytes = value match {
+ case uuid: java.util.UUID =>
+ val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16))
+ bb.putLong(uuid.getMostSignificantBits)
+ bb.putLong(uuid.getLeastSignificantBits)
+ bb.array()
+ case _ =>
+ // Parse UUID string and convert to bytes
+ val uuid = java.util.UUID.fromString(value.toString)
+ val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16))
+ bb.putLong(uuid.getMostSignificantBits)
+ bb.putLong(uuid.getLeastSignificantBits)
+ bb.array()
+ }
+
builder.setUuidVal(com.google.protobuf.ByteString.copyFrom(uuidBytes))
+
+ case t if t.startsWith("fixed[") || t.startsWith("binary") =>
+ val bytes = value match {
+ case bytes: Array[Byte] => bytes
+ case _ => value.toString.getBytes("UTF-8")
+ }
+ if (t.startsWith("fixed")) {
+ builder.setFixedVal(com.google.protobuf.ByteString.copyFrom(bytes))
+ } else {
+
builder.setBinaryVal(com.google.protobuf.ByteString.copyFrom(bytes))
+ }
+
+ // Fallback: infer type from Java type ?
+ case _ =>
+ value match {
+ case s: String => builder.setStringVal(s)
+ case i: java.lang.Integer => builder.setIntVal(i.intValue())
+ case l: java.lang.Long => builder.setLongVal(l.longValue())
+ case d: java.lang.Double => builder.setDoubleVal(d.doubleValue())
+ case f: java.lang.Float => builder.setFloatVal(f.floatValue())
+ case b: java.lang.Boolean => builder.setBoolVal(b.booleanValue())
+ case other => builder.setStringVal(other.toString)
+ }
+ }
}
+
+ builder.build()
}
/**
@@ -375,18 +426,17 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
}
- // Serialize partition data to JSON for iceberg-rust's constants_map.
- // The native execution engine uses partition_data_json +
- // partition_type_json to build a constants_map, which is the primary
- // mechanism for providing partition values to identity-transformed
- // partition columns. Non-identity transforms (bucket, truncate,
days,
- // etc.) read values from data files.
+ // Serialize partition data to protobuf for native execution.
+ // The native execution engine uses partition_data protobuf messages
to
+ // build a constants_map, which provides partition values to
identity-
+ // transformed partition columns. Non-identity transforms (bucket,
truncate,
+ // days, etc.) read values from data files.
//
- // IMPORTANT: Use the same field IDs as partition_type_json
(partition field IDs,
- // not source field IDs) so that JSON deserialization matches
correctly.
+ // IMPORTANT: Use partition field IDs (not source field IDs) to match
+ // the schema.
// Filter out fields with unknown type (same as partition type
filtering)
- val partitionDataMap: Map[String, JValue] =
+ val partitionValues: Seq[OperatorOuterClass.PartitionValue] =
fields.asScala.zipWithIndex.flatMap { case (field, idx) =>
val fieldTypeStr = getFieldType(field)
@@ -402,23 +452,25 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
partitionData.getClass.getMethod("get", classOf[Int],
classOf[Class[_]])
val value = getMethod.invoke(partitionData,
Integer.valueOf(idx), classOf[Object])
- val jsonValue = if (value == null) {
- JNull
- } else {
- partitionValueToJson(fieldTypeStr, value)
- }
- Some(fieldId.toString -> jsonValue)
+ Some(partitionValueToProto(fieldId, fieldTypeStr, value))
}
- }.toMap
+ }.toSeq
// Only serialize partition data if we have non-unknown fields
- if (partitionDataMap.nonEmpty) {
- val partitionJson =
compact(render(JObject(partitionDataMap.toList)))
+ if (partitionValues.nonEmpty) {
+ val partitionDataProto = OperatorOuterClass.PartitionData
+ .newBuilder()
+ .addAllValues(partitionValues.asJava)
+ .build()
+
+ // Deduplicate by protobuf bytes (use Base64 string as key)
+ val partitionDataBytes = partitionDataProto.toByteArray
+ val partitionDataKey =
java.util.Base64.getEncoder.encodeToString(partitionDataBytes)
val partitionDataIdx = partitionDataToPoolIndex.getOrElseUpdate(
- partitionJson, {
+ partitionDataKey, {
val idx = partitionDataToPoolIndex.size
- icebergScanBuilder.addPartitionDataPool(partitionJson)
+ icebergScanBuilder.addPartitionDataPool(partitionDataProto)
idx
})
taskBuilder.setPartitionDataIdx(partitionDataIdx)
@@ -637,7 +689,7 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
val partitionSpecToPoolIndex = mutable.HashMap[String, Int]()
val nameMappingToPoolIndex = mutable.HashMap[String, Int]()
val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]()
- val partitionDataToPoolIndex = mutable.HashMap[String, Int]()
+ val partitionDataToPoolIndex = mutable.HashMap[String, Int]() // Base64
bytes -> pool index
val deleteFilesToPoolIndex =
mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]()
val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]()
@@ -727,104 +779,6 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
throw new RuntimeException(msg)
}
- // Extract partition values for Hive-style partitioning
- var partitionJsonOpt: Option[String] = None
- try {
- val partitionMethod =
contentFileClass.getMethod("partition")
- val partitionStruct = partitionMethod.invoke(dataFile)
-
- if (partitionStruct != null) {
- // scalastyle:off classforname
- val structLikeClass =
-
Class.forName(IcebergReflection.ClassNames.STRUCT_LIKE)
- // scalastyle:on classforname
- val sizeMethod = structLikeClass.getMethod("size")
- val getMethod =
- structLikeClass.getMethod("get", classOf[Int],
classOf[Class[_]])
-
- val partitionSize =
- sizeMethod.invoke(partitionStruct).asInstanceOf[Int]
-
- if (partitionSize > 0) {
- // Get the partition spec directly from the task
- // scalastyle:off classforname
- val partitionScanTaskClass =
-
Class.forName(IcebergReflection.ClassNames.PARTITION_SCAN_TASK)
- // scalastyle:on classforname
- val specMethod =
partitionScanTaskClass.getMethod("spec")
- val partitionSpec = specMethod.invoke(task)
-
- // Build JSON representation of partition values
using json4s
-
- val partitionMap =
scala.collection.mutable.Map[String, JValue]()
-
- if (partitionSpec != null) {
- // Get the list of partition fields from the spec
- val fieldsMethod =
partitionSpec.getClass.getMethod("fields")
- val fields = fieldsMethod
- .invoke(partitionSpec)
- .asInstanceOf[java.util.List[_]]
-
- for (i <- 0 until partitionSize) {
- val value =
- getMethod.invoke(partitionStruct, Int.box(i),
classOf[Object])
-
- // Get the partition field and check its
transform type
- val partitionField = fields.get(i)
-
- // Only inject partition values for IDENTITY
transforms
- val transformMethod =
- partitionField.getClass.getMethod("transform")
- val transform =
transformMethod.invoke(partitionField)
- val isIdentity =
- transform.toString ==
IcebergReflection.Transforms.IDENTITY
-
- if (isIdentity) {
- // Get the source field ID
- val sourceIdMethod =
- partitionField.getClass.getMethod("sourceId")
- val sourceFieldId =
-
sourceIdMethod.invoke(partitionField).asInstanceOf[Int]
-
- val jsonValue = if (value == null) {
- JNull
- } else {
- // Get field type from schema to serialize
correctly
- val fieldTypeStr =
- try {
- val findFieldMethod =
- metadata.tableSchema.getClass
- .getMethod("findField", classOf[Int])
- val field = findFieldMethod.invoke(
- metadata.tableSchema,
- sourceFieldId.asInstanceOf[Object])
- if (field != null) {
- val typeMethod =
field.getClass.getMethod("type")
- typeMethod.invoke(field).toString
- } else {
- "unknown"
- }
- } catch {
- case _: Exception => "unknown"
- }
-
- partitionValueToJson(fieldTypeStr, value)
- }
- partitionMap(sourceFieldId.toString) =
jsonValue
- }
- }
- }
-
- val partitionJson =
compact(render(JObject(partitionMap.toList)))
- partitionJsonOpt = Some(partitionJson)
- }
- }
- } catch {
- case e: Exception =>
- logWarning(
- s"Failed to extract partition values from DataFile:
${e.getMessage}")
- }
-
val startMethod = contentScanTaskClass.getMethod("start")
val start = startMethod.invoke(task).asInstanceOf[Long]
taskBuilder.setStart(start)
@@ -1056,7 +1010,17 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
}
+ // Calculate partition data pool size in bytes (protobuf format)
+ val partitionDataPoolBytes =
icebergScanBuilder.getPartitionDataPoolList.asScala
+ .map(_.getSerializedSize)
+ .sum
+
logInfo(s"IcebergScan: $totalTasks tasks, ${allPoolSizes.size} pools
($avgDedup% avg dedup)")
+ if (partitionDataToPoolIndex.nonEmpty) {
+ logInfo(
+ s" Partition data pool: ${partitionDataToPoolIndex.size} unique
values, " +
+ s"$partitionDataPoolBytes bytes (protobuf)")
+ }
builder.clearChildren()
Some(builder.setIcebergScan(icebergScanBuilder).build())
diff --git a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
index c8d360ae5..00955e629 100644
--- a/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
+++ b/spark/src/test/scala/org/apache/comet/IcebergReadFromS3Suite.scala
@@ -163,6 +163,43 @@ class IcebergReadFromS3Suite extends CometS3TestBase {
}
}
+ test("large scale partitioned table - 100 partitions with many files") {
+ assume(icebergAvailable, "Iceberg not available in classpath")
+
+ withSQLConf(
+ "spark.sql.files.maxRecordsPerFile" -> "50",
+ "spark.sql.adaptive.enabled" -> "false") {
+ spark.sql("""
+ CREATE TABLE s3_catalog.db.large_partitioned_test (
+ id INT,
+ data STRING,
+ partition_id INT
+ ) USING iceberg
+ PARTITIONED BY (partition_id)
+ """)
+
+ spark.sql("""
+ INSERT INTO s3_catalog.db.large_partitioned_test
+ SELECT
+ id,
+ CONCAT('data_', CAST(id AS STRING)) as data,
+ (id % 100) as partition_id
+ FROM range(500000)
+ """)
+
+ checkIcebergNativeScan(
+ "SELECT COUNT(DISTINCT id) FROM s3_catalog.db.large_partitioned_test")
+ checkIcebergNativeScan(
+ "SELECT * FROM s3_catalog.db.large_partitioned_test WHERE id < 10
ORDER BY id")
+ checkIcebergNativeScan(
+ "SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE
partition_id = 0")
+ checkIcebergNativeScan(
+ "SELECT SUM(id) FROM s3_catalog.db.large_partitioned_test WHERE
partition_id IN (0, 50, 99)")
+
+ spark.sql("DROP TABLE s3_catalog.db.large_partitioned_test PURGE")
+ }
+ }
+
test("MOR table with deletes in S3") {
assume(icebergAvailable, "Iceberg not available in classpath")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]