This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new a6741e861 feat: CometNativeScan per-partition plan serde (#3511)
a6741e861 is described below

commit a6741e86135a89fb17cafa4aefd50bc962819611
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Feb 13 12:43:57 2026 -0500

    feat: CometNativeScan per-partition plan serde (#3511)
---
 native/core/src/execution/planner.rs               |  45 ++++----
 native/proto/src/proto/operator.proto              |  45 ++++----
 .../comet/serde/operator/CometNativeScan.scala     |  43 ++++----
 .../spark/sql/comet/CometNativeScanExec.scala      | 120 +++++++++++++++++++--
 .../org/apache/spark/sql/comet/operators.scala     |  51 ++++++++-
 .../parquet/ParquetReadFromFakeHadoopFsSuite.scala |   8 +-
 6 files changed, 238 insertions(+), 74 deletions(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 2c3d00a23..92a9cb23e 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -965,20 +965,31 @@ impl PhysicalPlanner {
                 ))
             }
             OpStruct::NativeScan(scan) => {
-                let data_schema = 
convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
+                // Extract common data and single partition's file list
+                // Per-partition injection happens in Scala before sending to 
native
+                let common = scan
+                    .common
+                    .as_ref()
+                    .ok_or_else(|| GeneralError("NativeScan missing common 
data".into()))?;
+
+                let data_schema =
+                    
convert_spark_types_to_arrow_schema(common.data_schema.as_slice());
                 let required_schema: SchemaRef =
-                    
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
+                    
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
                 let partition_schema: SchemaRef =
-                    
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
-                let projection_vector: Vec<usize> = scan
+                    
convert_spark_types_to_arrow_schema(common.partition_schema.as_slice());
+                let projection_vector: Vec<usize> = common
                     .projection_vector
                     .iter()
                     .map(|offset| *offset as usize)
                     .collect();
 
-                // Check if this partition has any files (bucketed scan with 
bucket pruning may have empty partitions)
-                let partition_files = &scan.file_partitions[self.partition as 
usize];
+                let partition_files = scan
+                    .file_partition
+                    .as_ref()
+                    .ok_or_else(|| GeneralError("NativeScan missing 
file_partition".into()))?;
 
+                // Check if this partition has any files (bucketed scan with 
bucket pruning may have empty partitions)
                 if partition_files.partitioned_file.is_empty() {
                     let empty_exec = Arc::new(EmptyExec::new(required_schema));
                     return Ok((
@@ -988,19 +999,19 @@ impl PhysicalPlanner {
                 }
 
                 // Convert the Spark expressions to Physical expressions
-                let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, 
ExecutionError> = scan
+                let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, 
ExecutionError> = common
                     .data_filters
                     .iter()
                     .map(|expr| self.create_expr(expr, 
Arc::clone(&required_schema)))
                     .collect();
 
-                let default_values: Option<HashMap<usize, ScalarValue>> = if 
!scan
+                let default_values: Option<HashMap<usize, ScalarValue>> = if 
!common
                     .default_values
                     .is_empty()
                 {
                     // We have default values. Extract the two lists (same 
length) of values and
                     // indexes in the schema, and then create a HashMap to use 
in the SchemaMapper.
-                    let default_values: Result<Vec<ScalarValue>, 
DataFusionError> = scan
+                    let default_values: Result<Vec<ScalarValue>, 
DataFusionError> = common
                         .default_values
                         .iter()
                         .map(|expr| {
@@ -1015,7 +1026,7 @@ impl PhysicalPlanner {
                         })
                         .collect();
                     let default_values = default_values?;
-                    let default_values_indexes: Vec<usize> = scan
+                    let default_values_indexes: Vec<usize> = common
                         .default_values_indexes
                         .iter()
                         .map(|offset| *offset as usize)
@@ -1037,7 +1048,7 @@ impl PhysicalPlanner {
                     .map(|f| f.file_path.clone())
                     .expect("partition should have files after empty check");
 
-                let object_store_options: HashMap<String, String> = scan
+                let object_store_options: HashMap<String, String> = common
                     .object_store_options
                     .iter()
                     .map(|(k, v)| (k.clone(), v.clone()))
@@ -1048,10 +1059,8 @@ impl PhysicalPlanner {
                     &object_store_options,
                 )?;
 
-                // Comet serializes all partitions' PartitionedFiles, but we 
only want to read this
-                // Spark partition's PartitionedFiles
-                let files =
-                    
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
+                // Get files for this partition
+                let files = self.get_partitioned_files(partition_files)?;
                 let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
                 let partition_fields: Vec<Field> = partition_schema
                     .fields()
@@ -1070,10 +1079,10 @@ impl PhysicalPlanner {
                     Some(projection_vector),
                     Some(data_filters?),
                     default_values,
-                    scan.session_timezone.as_str(),
-                    scan.case_sensitive,
+                    common.session_timezone.as_str(),
+                    common.case_sensitive,
                     self.session_ctx(),
-                    scan.encryption_enabled,
+                    common.encryption_enabled,
                 )?;
                 Ok((
                     vec![],
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index 78f118e6d..93872b462 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -85,30 +85,29 @@ message Scan {
   bool arrow_ffi_safe = 3;
 }
 
+// Common data shared by all partitions in split mode (sent once at planning)
+message NativeScanCommon {
+  repeated SparkStructField required_schema = 1;
+  repeated SparkStructField data_schema = 2;
+  repeated SparkStructField partition_schema = 3;
+  repeated spark.spark_expression.Expr data_filters = 4;
+  repeated int64 projection_vector = 5;
+  string session_timezone = 6;
+  repeated spark.spark_expression.Expr default_values = 7;
+  repeated int64 default_values_indexes = 8;
+  bool case_sensitive = 9;
+  map<string, string> object_store_options = 10;
+  bool encryption_enabled = 11;
+  string source = 12;
+  repeated spark.spark_expression.DataType fields = 13;
+}
+
 message NativeScan {
-  repeated spark.spark_expression.DataType fields = 1;
-  // The source of the scan (e.g. file scan, broadcast exchange, shuffle, 
etc). This
-  // is purely for informational purposes when viewing native query plans in
-  // debug mode.
-  string source = 2;
-  repeated SparkStructField required_schema = 3;
-  repeated SparkStructField data_schema = 4;
-  repeated SparkStructField partition_schema = 5;
-  repeated spark.spark_expression.Expr data_filters = 6;
-  repeated SparkFilePartition file_partitions = 7;
-  repeated int64 projection_vector = 8;
-  string session_timezone = 9;
-  repeated spark.spark_expression.Expr default_values = 10;
-  repeated int64 default_values_indexes = 11;
-  bool case_sensitive = 12;
-  // Options for configuring object stores such as AWS S3, GCS, etc. The 
key-value pairs are taken
-  // from Hadoop configuration for compatibility with Hadoop FileSystem 
implementations of object
-  // stores.
-  // The configuration values have hadoop. or spark.hadoop. prefix trimmed. 
For instance, the
-  // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as 
"fs.s3a.access.key" in
-  // the map.
-  map<string, string> object_store_options = 13;
-  bool encryption_enabled = 14;
+  // Common data shared across partitions (schemas, filters, projections, 
config)
+  NativeScanCommon common = 1;
+
+  // Single partition's file list (injected at execution time)
+  SparkFilePartition file_partition = 2;
 }
 
 message CsvScan {
diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
index b7909b67c..d5d075760 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
@@ -97,14 +97,17 @@ object CometNativeScan extends 
CometOperatorSerde[CometScanExec] with Logging {
       builder: Operator.Builder,
       childOp: OperatorOuterClass.Operator*): 
Option[OperatorOuterClass.Operator] = {
     val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
-    nativeScanBuilder.setSource(scan.simpleStringWithNodeId())
+    val commonBuilder = OperatorOuterClass.NativeScanCommon.newBuilder()
+
+    // Set source in common (used as part of injection key)
+    commonBuilder.setSource(scan.simpleStringWithNodeId())
 
     val scanTypes = scan.output.flatten { attr =>
       serializeDataType(attr.dataType)
     }
 
     if (scanTypes.length == scan.output.length) {
-      nativeScanBuilder.addAllFields(scanTypes.asJava)
+      commonBuilder.addAllFields(scanTypes.asJava)
 
       // Sink operators don't have children
       builder.clearChildren()
@@ -120,7 +123,7 @@ object CometNativeScan extends 
CometOperatorSerde[CometScanExec] with Logging {
               logWarning(s"Unsupported data filter $filter")
           }
         }
-        nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
+        commonBuilder.addAllDataFilters(dataFilters.asJava)
       }
 
       val possibleDefaultValues = 
getExistenceDefaultValues(scan.requiredSchema)
@@ -136,20 +139,15 @@ object CometNativeScan extends 
CometOperatorSerde[CometScanExec] with Logging {
             (Literal(expr), index.toLong.asInstanceOf[java.lang.Long])
           }
           .unzip
-        nativeScanBuilder.addAllDefaultValues(
+        commonBuilder.addAllDefaultValues(
           defaultValues.flatMap(exprToProto(_, scan.output)).toIterable.asJava)
-        nativeScanBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
+        commonBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
       }
 
+      // Extract object store options from first file (S3 configs apply to all 
files in scan)
       var firstPartition: Option[PartitionedFile] = None
       val filePartitions = scan.getFilePartitions()
-      val filePartitionsProto = filePartitions.map { partition =>
-        if (firstPartition.isEmpty) {
-          firstPartition = partition.files.headOption
-        }
-        partition2Proto(partition, scan.relation.partitionSchema)
-      }
-      nativeScanBuilder.addAllFilePartitions(filePartitionsProto.asJava)
+      firstPartition = filePartitions.flatMap(_.files.headOption).headOption
 
       val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields)
       val requiredSchema = schema2Proto(scan.requiredSchema.fields)
@@ -166,31 +164,34 @@ object CometNativeScan extends 
CometOperatorSerde[CometScanExec] with Logging {
       val projectionVector = (dataSchemaIndexes ++ 
partitionSchemaIndexes).map(idx =>
         idx.toLong.asInstanceOf[java.lang.Long])
 
-      
nativeScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)
+      commonBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)
 
       // In `CometScanRule`, we ensure partitionSchema is supported.
       assert(partitionSchema.length == 
scan.relation.partitionSchema.fields.length)
 
-      nativeScanBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
-      nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
-      
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
-      
nativeScanBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
-      
nativeScanBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
+      commonBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
+      commonBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
+      commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
+      
commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
+      
commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
 
       // Collect S3/cloud storage configurations
       val hadoopConf = scan.relation.sparkSession.sessionState
         .newHadoopConfWithOptions(scan.relation.options)
 
-      
nativeScanBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf))
+      
commonBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf))
 
       firstPartition.foreach { partitionFile =>
         val objectStoreOptions =
           NativeConfig.extractObjectStoreOptions(hadoopConf, 
partitionFile.pathUri)
         objectStoreOptions.foreach { case (key, value) =>
-          nativeScanBuilder.putObjectStoreOptions(key, value)
+          commonBuilder.putObjectStoreOptions(key, value)
         }
       }
 
+      // Set common data in NativeScan (file_partition will be populated at 
execution time)
+      nativeScanBuilder.setCommon(commonBuilder.build())
+
       Some(builder.setNativeScan(nativeScanBuilder).build())
 
     } else {
@@ -204,6 +205,6 @@ object CometNativeScan extends 
CometOperatorSerde[CometScanExec] with Logging {
   }
 
   override def createExec(nativeOp: Operator, op: CometScanExec): 
CometNativeExec = {
-    CometNativeScanExec(nativeOp, op.wrapped, op.session)
+    CometNativeScanExec(nativeOp, op.wrapped, op.session, op)
   }
 }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 841bc21aa..3f2748c3e 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -32,16 +32,28 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.collection._
 
 import com.google.common.base.Objects
 
 import org.apache.comet.CometConf
-import org.apache.comet.parquet.CometParquetFileFormat
+import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 
 /**
- * Comet fully native scan node for DataSource V1 that delegates to 
DataFusion's DataSourceExec.
+ * Native scan operator for DataSource V1 Parquet files using DataFusion's 
ParquetExec.
+ *
+ * Replaces Spark's FileSourceScanExec to enable native execution. File 
planning runs in Spark to
+ * produce FilePartitions (handling bucketing, partition pruning, etc.), which 
are serialized to
+ * protobuf for DataFusion to execute using its ParquetExec. This provides 
better performance than
+ * reading through Spark's FileFormat abstraction.
+ *
+ * Uses split-mode serialization introduced in PR #3349: common scan metadata 
(schemas, filters,
+ * projections) is serialized once at planning time, while per-partition file 
lists are lazily
+ * serialized at execution time. This reduces memory when scanning tables with 
many partitions, as
+ * each executor task receives only its partition's file list rather than all 
files.
  */
 case class CometNativeScanExec(
     override val nativeOp: Operator,
@@ -55,7 +67,9 @@ case class CometNativeScanExec(
     tableIdentifier: Option[TableIdentifier],
     disableBucketedScan: Boolean = false,
     originalPlan: FileSourceScanExec,
-    override val serializedPlanOpt: SerializedPlan)
+    override val serializedPlanOpt: SerializedPlan,
+    @transient scan: CometScanExec, // Lazy access to file partitions without 
serializing with plan
+    sourceKey: String) // Key for PlanDataInjector to match common+partition 
data at runtime
     extends CometLeafExec
     with DataSourceScanExec
     with ShimStreamSourceAwareSparkPlan {
@@ -78,6 +92,82 @@ case class CometNativeScanExec(
 
   override lazy val outputOrdering: Seq[SortOrder] = 
originalPlan.outputOrdering
 
+  /**
+   * Lazy partition serialization - deferred until execution time to reduce 
driver memory.
+   *
+   * Split-mode serialization pattern:
+   * {{{
+   * Planning time:
+   *   - CometNativeScan.convert() serializes common data (schemas, filters, 
projections)
+   *   - commonData embedded in nativeOp protobuf
+   *   - File partitions NOT serialized yet
+   *
+   * Execution time:
+   *   - doExecuteColumnar() accesses commonData and perPartitionData
+   *   - Forces serializedPartitionData evaluation (here)
+   *   - Each partition's file list serialized separately
+   *   - CometExecRDD receives per-partition data and injects at runtime
+   * }}}
+   *
+   * This pattern reduces memory usage for tables with many partitions - 
instead of serializing
+   * all files for all partitions in the driver, we serialize only common 
metadata (once) and each
+   * partition's files (lazily, as tasks are scheduled).
+   */
+  @transient private lazy val serializedPartitionData: (Array[Byte], 
Array[Array[Byte]]) = {
+    // Extract common data from nativeOp
+    val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray
+
+    // Get file partitions from CometScanExec (handles bucketing, etc.)
+    val filePartitions = scan.getFilePartitions()
+
+    // Serialize each partition's files
+    import org.apache.comet.serde.operator.partition2Proto
+    val perPartitionBytes = filePartitions.map { filePartition =>
+      val partitionProto = partition2Proto(filePartition, 
relation.partitionSchema)
+      val partitionNativeScan = 
org.apache.comet.serde.OperatorOuterClass.NativeScan
+        .newBuilder()
+        .setFilePartition(partitionProto)
+        .build()
+
+      partitionNativeScan.toByteArray
+    }.toArray
+
+    (commonBytes, perPartitionBytes)
+  }
+
+  def commonData: Array[Byte] = serializedPartitionData._1
+  def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val nativeMetrics = CometMetricNode.fromCometPlan(this)
+    val serializedPlan = CometExec.serializeNativePlan(nativeOp)
+
+    // Encryption config must be passed to each executor task
+    val hadoopConf = relation.sparkSession.sessionState
+      .newHadoopConfWithOptions(relation.options)
+    val encryptionEnabled = CometParquetUtils.encryptionEnabled(hadoopConf)
+    val (broadcastedHadoopConfForEncryption, encryptedFilePaths) = if 
(encryptionEnabled) {
+      val broadcastedConf = relation.sparkSession.sparkContext
+        .broadcast(new SerializableConfiguration(hadoopConf))
+      (Some(broadcastedConf), relation.inputFiles.toSeq)
+    } else {
+      (None, Seq.empty)
+    }
+
+    CometExecRDD(
+      sparkContext,
+      inputRDDs = Seq.empty,
+      commonByKey = Map(sourceKey -> commonData),
+      perPartitionByKey = Map(sourceKey -> perPartitionData),
+      serializedPlan = serializedPlan,
+      numPartitions = perPartitionData.length,
+      numOutputCols = output.length,
+      nativeMetrics = nativeMetrics,
+      subqueries = Seq.empty,
+      broadcastedHadoopConfForEncryption = broadcastedHadoopConfForEncryption,
+      encryptedFilePaths = encryptedFilePaths)
+  }
+
   override def doCanonicalize(): CometNativeScanExec = {
     CometNativeScanExec(
       nativeOp,
@@ -93,7 +183,10 @@ case class CometNativeScanExec(
       None,
       disableBucketedScan,
       originalPlan.doCanonicalize(),
-      SerializedPlan(None))
+      SerializedPlan(None),
+      null, // Transient scan not needed for canonicalization
+      ""
+    ) // sourceKey not needed for canonicalization
   }
 
   override def stringArgs: Iterator[Any] = Iterator(output)
@@ -123,7 +216,8 @@ object CometNativeScanExec {
   def apply(
       nativeOp: Operator,
       scanExec: FileSourceScanExec,
-      session: SparkSession): CometNativeScanExec = {
+      session: SparkSession,
+      scan: CometScanExec): CometNativeScanExec = {
     // TreeNode.mapProductIterator is protected method.
     def mapProductIterator[B: ClassTag](product: Product, f: Any => B): 
Array[B] = {
       val arr = Array.ofDim[B](product.productArity)
@@ -135,6 +229,18 @@ object CometNativeScanExec {
       arr
     }
 
+    // Generate unique key for this scan so PlanDataInjector can match 
common+partition data.
+    // Multiple scans of same table with different projections/filters get 
different keys.
+    val common = nativeOp.getNativeScan.getCommon
+    val source = common.getSource
+    val keyComponents = Seq(
+      common.getRequiredSchemaList.toString,
+      common.getDataFiltersList.toString,
+      common.getProjectionVectorList.toString,
+      common.getFieldsList.toString)
+    val hashCode = keyComponents.mkString("|").hashCode
+    val sourceKey = s"${source}_${hashCode}"
+
     // Replacing the relation in FileSourceScanExec by `copy` seems causing 
some issues
     // on other Spark distributions if FileSourceScanExec constructor is 
changed.
     // Using `makeCopy` to avoid the issue.
@@ -161,7 +267,9 @@ object CometNativeScanExec {
       wrapped.tableIdentifier,
       wrapped.disableBucketedScan,
       wrapped,
-      SerializedPlan(None))
+      SerializedPlan(None),
+      scan,
+      sourceKey)
     scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
     batchScanExec
   }
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index eba74c9e2..da2ae21a9 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -82,7 +82,8 @@ private[comet] object PlanDataInjector {
 
   // Registry of injectors for different operator types
   private val injectors: Seq[PlanDataInjector] = Seq(
-    IcebergPlanDataInjector
+    IcebergPlanDataInjector,
+    NativeScanPlanDataInjector
     // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc.
   )
 
@@ -112,7 +113,7 @@ private[comet] object PlanDataInjector {
             case _ =>
               throw new CometRuntimeException(s"Missing planning data for key: 
$key")
           }
-        case None => // No key, skip injection
+        case None =>
       }
     }
 
@@ -191,6 +192,46 @@ private[comet] object IcebergPlanDataInjector extends 
PlanDataInjector {
   }
 }
 
+/**
+ * Injector for NativeScan operators.
+ */
+private[comet] object NativeScanPlanDataInjector extends PlanDataInjector {
+
+  override def canInject(op: Operator): Boolean =
+    op.hasNativeScan &&
+      op.getNativeScan.hasCommon &&
+      !op.getNativeScan.hasFilePartition
+
+  override def getKey(op: Operator): Option[String] = {
+    // Reconstruct the same sourceKey that was used when storing the data
+    val common = op.getNativeScan.getCommon
+    val source = common.getSource
+    val keyComponents = Seq(
+      common.getRequiredSchemaList.toString,
+      common.getDataFiltersList.toString,
+      common.getProjectionVectorList.toString,
+      common.getFieldsList.toString)
+    val hashCode = keyComponents.mkString("|").hashCode
+    Some(s"${source}_${hashCode}")
+  }
+
+  override def inject(
+      op: Operator,
+      commonBytes: Array[Byte],
+      partitionBytes: Array[Byte]): Operator = {
+
+    val common = OperatorOuterClass.NativeScanCommon.parseFrom(commonBytes)
+    val partitionOnly = OperatorOuterClass.NativeScan.parseFrom(partitionBytes)
+
+    // Build complete NativeScan with common fields + this partition's file 
list
+    val scanBuilder = OperatorOuterClass.NativeScan.newBuilder()
+    scanBuilder.setCommon(common)
+    scanBuilder.setFilePartition(partitionOnly.getFilePartition)
+
+    op.toBuilder.setNativeScan(scanBuilder).build()
+  }
+}
+
 /**
  * A Comet physical operator
  */
@@ -589,6 +630,12 @@ abstract class CometNativeExec extends CometExec {
           Map(iceberg.metadataLocation -> iceberg.commonData),
           Map(iceberg.metadataLocation -> iceberg.perPartitionData))
 
+      // Found a NativeScan with planning data
+      case nativeScan: CometNativeScanExec =>
+        (
+          Map(nativeScan.sourceKey -> nativeScan.commonData),
+          Map(nativeScan.sourceKey -> nativeScan.perPartitionData))
+
       // Broadcast stages are boundaries - don't collect per-partition data 
from inside them.
       // After DPP filtering, broadcast scans may have different partition 
counts than the
       // probe side, causing ArrayIndexOutOfBoundsException in 
CometExecRDD.getPartitions.
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
index f4a8b5ed8..b8db737a3 100644
--- 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
+++ 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
@@ -66,11 +66,11 @@ class ParquetReadFromFakeHadoopFsSuite extends 
CometTestBase with AdaptiveSparkP
       p
     }
     assert(scans.size == 1)
+    // File partitions are now accessed from the scan field, not from the 
protobuf
+    val filePartitions = scans.head.scan.getFilePartitions()
+    assert(filePartitions.nonEmpty)
     assert(
-      scans.head.nativeOp.getNativeScan
-        .getFilePartitions(0)
-        .getPartitionedFile(0)
-        .getFilePath
+      filePartitions.head.files.head.filePath.toString
         .startsWith(FakeHDFSFileSystem.PREFIX))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to