This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new a6741e861 feat: CometNativeScan per-partition plan serde (#3511)
a6741e861 is described below
commit a6741e86135a89fb17cafa4aefd50bc962819611
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Feb 13 12:43:57 2026 -0500
feat: CometNativeScan per-partition plan serde (#3511)
---
native/core/src/execution/planner.rs | 45 ++++----
native/proto/src/proto/operator.proto | 45 ++++----
.../comet/serde/operator/CometNativeScan.scala | 43 ++++----
.../spark/sql/comet/CometNativeScanExec.scala | 120 +++++++++++++++++++--
.../org/apache/spark/sql/comet/operators.scala | 51 ++++++++-
.../parquet/ParquetReadFromFakeHadoopFsSuite.scala | 8 +-
6 files changed, 238 insertions(+), 74 deletions(-)
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 2c3d00a23..92a9cb23e 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -965,20 +965,31 @@ impl PhysicalPlanner {
))
}
OpStruct::NativeScan(scan) => {
- let data_schema =
convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
+ // Extract common data and single partition's file list
+ // Per-partition injection happens in Scala before sending to
native
+ let common = scan
+ .common
+ .as_ref()
+ .ok_or_else(|| GeneralError("NativeScan missing common
data".into()))?;
+
+ let data_schema =
+
convert_spark_types_to_arrow_schema(common.data_schema.as_slice());
let required_schema: SchemaRef =
-
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
+
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
let partition_schema: SchemaRef =
-
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
- let projection_vector: Vec<usize> = scan
+
convert_spark_types_to_arrow_schema(common.partition_schema.as_slice());
+ let projection_vector: Vec<usize> = common
.projection_vector
.iter()
.map(|offset| *offset as usize)
.collect();
- // Check if this partition has any files (bucketed scan with
bucket pruning may have empty partitions)
- let partition_files = &scan.file_partitions[self.partition as
usize];
+ let partition_files = scan
+ .file_partition
+ .as_ref()
+ .ok_or_else(|| GeneralError("NativeScan missing
file_partition".into()))?;
+ // Check if this partition has any files (bucketed scan with
bucket pruning may have empty partitions)
if partition_files.partitioned_file.is_empty() {
let empty_exec = Arc::new(EmptyExec::new(required_schema));
return Ok((
@@ -988,19 +999,19 @@ impl PhysicalPlanner {
}
// Convert the Spark expressions to Physical expressions
- let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>,
ExecutionError> = scan
+ let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>,
ExecutionError> = common
.data_filters
.iter()
.map(|expr| self.create_expr(expr,
Arc::clone(&required_schema)))
.collect();
- let default_values: Option<HashMap<usize, ScalarValue>> = if
!scan
+ let default_values: Option<HashMap<usize, ScalarValue>> = if
!common
.default_values
.is_empty()
{
// We have default values. Extract the two lists (same
length) of values and
// indexes in the schema, and then create a HashMap to use
in the SchemaMapper.
- let default_values: Result<Vec<ScalarValue>,
DataFusionError> = scan
+ let default_values: Result<Vec<ScalarValue>,
DataFusionError> = common
.default_values
.iter()
.map(|expr| {
@@ -1015,7 +1026,7 @@ impl PhysicalPlanner {
})
.collect();
let default_values = default_values?;
- let default_values_indexes: Vec<usize> = scan
+ let default_values_indexes: Vec<usize> = common
.default_values_indexes
.iter()
.map(|offset| *offset as usize)
@@ -1037,7 +1048,7 @@ impl PhysicalPlanner {
.map(|f| f.file_path.clone())
.expect("partition should have files after empty check");
- let object_store_options: HashMap<String, String> = scan
+ let object_store_options: HashMap<String, String> = common
.object_store_options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
@@ -1048,10 +1059,8 @@ impl PhysicalPlanner {
&object_store_options,
)?;
- // Comet serializes all partitions' PartitionedFiles, but we
only want to read this
- // Spark partition's PartitionedFiles
- let files =
-
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
+ // Get files for this partition
+ let files = self.get_partitioned_files(partition_files)?;
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let partition_fields: Vec<Field> = partition_schema
.fields()
@@ -1070,10 +1079,10 @@ impl PhysicalPlanner {
Some(projection_vector),
Some(data_filters?),
default_values,
- scan.session_timezone.as_str(),
- scan.case_sensitive,
+ common.session_timezone.as_str(),
+ common.case_sensitive,
self.session_ctx(),
- scan.encryption_enabled,
+ common.encryption_enabled,
)?;
Ok((
vec![],
diff --git a/native/proto/src/proto/operator.proto
b/native/proto/src/proto/operator.proto
index 78f118e6d..93872b462 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -85,30 +85,29 @@ message Scan {
bool arrow_ffi_safe = 3;
}
+// Common data shared by all partitions in split mode (sent once at planning)
+message NativeScanCommon {
+ repeated SparkStructField required_schema = 1;
+ repeated SparkStructField data_schema = 2;
+ repeated SparkStructField partition_schema = 3;
+ repeated spark.spark_expression.Expr data_filters = 4;
+ repeated int64 projection_vector = 5;
+ string session_timezone = 6;
+ repeated spark.spark_expression.Expr default_values = 7;
+ repeated int64 default_values_indexes = 8;
+ bool case_sensitive = 9;
+ map<string, string> object_store_options = 10;
+ bool encryption_enabled = 11;
+ string source = 12;
+ repeated spark.spark_expression.DataType fields = 13;
+}
+
message NativeScan {
- repeated spark.spark_expression.DataType fields = 1;
- // The source of the scan (e.g. file scan, broadcast exchange, shuffle,
etc). This
- // is purely for informational purposes when viewing native query plans in
- // debug mode.
- string source = 2;
- repeated SparkStructField required_schema = 3;
- repeated SparkStructField data_schema = 4;
- repeated SparkStructField partition_schema = 5;
- repeated spark.spark_expression.Expr data_filters = 6;
- repeated SparkFilePartition file_partitions = 7;
- repeated int64 projection_vector = 8;
- string session_timezone = 9;
- repeated spark.spark_expression.Expr default_values = 10;
- repeated int64 default_values_indexes = 11;
- bool case_sensitive = 12;
- // Options for configuring object stores such as AWS S3, GCS, etc. The
key-value pairs are taken
- // from Hadoop configuration for compatibility with Hadoop FileSystem
implementations of object
- // stores.
- // The configuration values have hadoop. or spark.hadoop. prefix trimmed.
For instance, the
- // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as
"fs.s3a.access.key" in
- // the map.
- map<string, string> object_store_options = 13;
- bool encryption_enabled = 14;
+ // Common data shared across partitions (schemas, filters, projections,
config)
+ NativeScanCommon common = 1;
+
+ // Single partition's file list (injected at execution time)
+ SparkFilePartition file_partition = 2;
}
message CsvScan {
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
index b7909b67c..d5d075760 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
@@ -97,14 +97,17 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*):
Option[OperatorOuterClass.Operator] = {
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
- nativeScanBuilder.setSource(scan.simpleStringWithNodeId())
+ val commonBuilder = OperatorOuterClass.NativeScanCommon.newBuilder()
+
+ // Set source in common (used as part of injection key)
+ commonBuilder.setSource(scan.simpleStringWithNodeId())
val scanTypes = scan.output.flatten { attr =>
serializeDataType(attr.dataType)
}
if (scanTypes.length == scan.output.length) {
- nativeScanBuilder.addAllFields(scanTypes.asJava)
+ commonBuilder.addAllFields(scanTypes.asJava)
// Sink operators don't have children
builder.clearChildren()
@@ -120,7 +123,7 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
logWarning(s"Unsupported data filter $filter")
}
}
- nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
+ commonBuilder.addAllDataFilters(dataFilters.asJava)
}
val possibleDefaultValues =
getExistenceDefaultValues(scan.requiredSchema)
@@ -136,20 +139,15 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
(Literal(expr), index.toLong.asInstanceOf[java.lang.Long])
}
.unzip
- nativeScanBuilder.addAllDefaultValues(
+ commonBuilder.addAllDefaultValues(
defaultValues.flatMap(exprToProto(_, scan.output)).toIterable.asJava)
- nativeScanBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
+ commonBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
}
+ // Extract object store options from first file (S3 configs apply to all
files in scan)
var firstPartition: Option[PartitionedFile] = None
val filePartitions = scan.getFilePartitions()
- val filePartitionsProto = filePartitions.map { partition =>
- if (firstPartition.isEmpty) {
- firstPartition = partition.files.headOption
- }
- partition2Proto(partition, scan.relation.partitionSchema)
- }
- nativeScanBuilder.addAllFilePartitions(filePartitionsProto.asJava)
+ firstPartition = filePartitions.flatMap(_.files.headOption).headOption
val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields)
val requiredSchema = schema2Proto(scan.requiredSchema.fields)
@@ -166,31 +164,34 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
val projectionVector = (dataSchemaIndexes ++
partitionSchemaIndexes).map(idx =>
idx.toLong.asInstanceOf[java.lang.Long])
-
nativeScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)
+ commonBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)
// In `CometScanRule`, we ensure partitionSchema is supported.
assert(partitionSchema.length ==
scan.relation.partitionSchema.fields.length)
- nativeScanBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
- nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
-
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
-
nativeScanBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
-
nativeScanBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
+ commonBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
+ commonBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
+ commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
+
commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
+
commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
// Collect S3/cloud storage configurations
val hadoopConf = scan.relation.sparkSession.sessionState
.newHadoopConfWithOptions(scan.relation.options)
-
nativeScanBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf))
+
commonBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf))
firstPartition.foreach { partitionFile =>
val objectStoreOptions =
NativeConfig.extractObjectStoreOptions(hadoopConf,
partitionFile.pathUri)
objectStoreOptions.foreach { case (key, value) =>
- nativeScanBuilder.putObjectStoreOptions(key, value)
+ commonBuilder.putObjectStoreOptions(key, value)
}
}
+ // Set common data in NativeScan (file_partition will be populated at
execution time)
+ nativeScanBuilder.setCommon(commonBuilder.build())
+
Some(builder.setNativeScan(nativeScanBuilder).build())
} else {
@@ -204,6 +205,6 @@ object CometNativeScan extends
CometOperatorSerde[CometScanExec] with Logging {
}
override def createExec(nativeOp: Operator, op: CometScanExec):
CometNativeExec = {
- CometNativeScanExec(nativeOp, op.wrapped, op.session)
+ CometNativeScanExec(nativeOp, op.wrapped, op.session, op)
}
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
index 841bc21aa..3f2748c3e 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala
@@ -32,16 +32,28 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection._
import com.google.common.base.Objects
import org.apache.comet.CometConf
-import org.apache.comet.parquet.CometParquetFileFormat
+import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils}
import org.apache.comet.serde.OperatorOuterClass.Operator
/**
- * Comet fully native scan node for DataSource V1 that delegates to
DataFusion's DataSourceExec.
+ * Native scan operator for DataSource V1 Parquet files using DataFusion's
ParquetExec.
+ *
+ * Replaces Spark's FileSourceScanExec to enable native execution. File
planning runs in Spark to
+ * produce FilePartitions (handling bucketing, partition pruning, etc.), which
are serialized to
+ * protobuf for DataFusion to execute using its ParquetExec. This provides
better performance than
+ * reading through Spark's FileFormat abstraction.
+ *
+ * Uses split-mode serialization introduced in PR #3349: common scan metadata
(schemas, filters,
+ * projections) is serialized once at planning time, while per-partition file
lists are lazily
+ * serialized at execution time. This reduces memory when scanning tables with
many partitions, as
+ * each executor task receives only its partition's file list rather than all
files.
*/
case class CometNativeScanExec(
override val nativeOp: Operator,
@@ -55,7 +67,9 @@ case class CometNativeScanExec(
tableIdentifier: Option[TableIdentifier],
disableBucketedScan: Boolean = false,
originalPlan: FileSourceScanExec,
- override val serializedPlanOpt: SerializedPlan)
+ override val serializedPlanOpt: SerializedPlan,
+ @transient scan: CometScanExec, // Lazy access to file partitions without
serializing with plan
+ sourceKey: String) // Key for PlanDataInjector to match common+partition
data at runtime
extends CometLeafExec
with DataSourceScanExec
with ShimStreamSourceAwareSparkPlan {
@@ -78,6 +92,82 @@ case class CometNativeScanExec(
override lazy val outputOrdering: Seq[SortOrder] =
originalPlan.outputOrdering
+ /**
+ * Lazy partition serialization - deferred until execution time to reduce
driver memory.
+ *
+ * Split-mode serialization pattern:
+ * {{{
+ * Planning time:
+ * - CometNativeScan.convert() serializes common data (schemas, filters,
projections)
+ * - commonData embedded in nativeOp protobuf
+ * - File partitions NOT serialized yet
+ *
+ * Execution time:
+ * - doExecuteColumnar() accesses commonData and perPartitionData
+ * - Forces serializedPartitionData evaluation (here)
+ * - Each partition's file list serialized separately
+ * - CometExecRDD receives per-partition data and injects at runtime
+ * }}}
+ *
+ * This pattern reduces memory usage for tables with many partitions -
instead of serializing
+ * all files for all partitions in the driver, we serialize only common
metadata (once) and each
+ * partition's files (lazily, as tasks are scheduled).
+ */
+ @transient private lazy val serializedPartitionData: (Array[Byte],
Array[Array[Byte]]) = {
+ // Extract common data from nativeOp
+ val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray
+
+ // Get file partitions from CometScanExec (handles bucketing, etc.)
+ val filePartitions = scan.getFilePartitions()
+
+ // Serialize each partition's files
+ import org.apache.comet.serde.operator.partition2Proto
+ val perPartitionBytes = filePartitions.map { filePartition =>
+ val partitionProto = partition2Proto(filePartition,
relation.partitionSchema)
+ val partitionNativeScan =
org.apache.comet.serde.OperatorOuterClass.NativeScan
+ .newBuilder()
+ .setFilePartition(partitionProto)
+ .build()
+
+ partitionNativeScan.toByteArray
+ }.toArray
+
+ (commonBytes, perPartitionBytes)
+ }
+
+ def commonData: Array[Byte] = serializedPartitionData._1
+ def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val nativeMetrics = CometMetricNode.fromCometPlan(this)
+ val serializedPlan = CometExec.serializeNativePlan(nativeOp)
+
+ // Encryption config must be passed to each executor task
+ val hadoopConf = relation.sparkSession.sessionState
+ .newHadoopConfWithOptions(relation.options)
+ val encryptionEnabled = CometParquetUtils.encryptionEnabled(hadoopConf)
+ val (broadcastedHadoopConfForEncryption, encryptedFilePaths) = if
(encryptionEnabled) {
+ val broadcastedConf = relation.sparkSession.sparkContext
+ .broadcast(new SerializableConfiguration(hadoopConf))
+ (Some(broadcastedConf), relation.inputFiles.toSeq)
+ } else {
+ (None, Seq.empty)
+ }
+
+ CometExecRDD(
+ sparkContext,
+ inputRDDs = Seq.empty,
+ commonByKey = Map(sourceKey -> commonData),
+ perPartitionByKey = Map(sourceKey -> perPartitionData),
+ serializedPlan = serializedPlan,
+ numPartitions = perPartitionData.length,
+ numOutputCols = output.length,
+ nativeMetrics = nativeMetrics,
+ subqueries = Seq.empty,
+ broadcastedHadoopConfForEncryption = broadcastedHadoopConfForEncryption,
+ encryptedFilePaths = encryptedFilePaths)
+ }
+
override def doCanonicalize(): CometNativeScanExec = {
CometNativeScanExec(
nativeOp,
@@ -93,7 +183,10 @@ case class CometNativeScanExec(
None,
disableBucketedScan,
originalPlan.doCanonicalize(),
- SerializedPlan(None))
+ SerializedPlan(None),
+ null, // Transient scan not needed for canonicalization
+ ""
+ ) // sourceKey not needed for canonicalization
}
override def stringArgs: Iterator[Any] = Iterator(output)
@@ -123,7 +216,8 @@ object CometNativeScanExec {
def apply(
nativeOp: Operator,
scanExec: FileSourceScanExec,
- session: SparkSession): CometNativeScanExec = {
+ session: SparkSession,
+ scan: CometScanExec): CometNativeScanExec = {
// TreeNode.mapProductIterator is protected method.
def mapProductIterator[B: ClassTag](product: Product, f: Any => B):
Array[B] = {
val arr = Array.ofDim[B](product.productArity)
@@ -135,6 +229,18 @@ object CometNativeScanExec {
arr
}
+ // Generate unique key for this scan so PlanDataInjector can match
common+partition data.
+ // Multiple scans of same table with different projections/filters get
different keys.
+ val common = nativeOp.getNativeScan.getCommon
+ val source = common.getSource
+ val keyComponents = Seq(
+ common.getRequiredSchemaList.toString,
+ common.getDataFiltersList.toString,
+ common.getProjectionVectorList.toString,
+ common.getFieldsList.toString)
+ val hashCode = keyComponents.mkString("|").hashCode
+ val sourceKey = s"${source}_${hashCode}"
+
// Replacing the relation in FileSourceScanExec by `copy` seems causing
some issues
// on other Spark distributions if FileSourceScanExec constructor is
changed.
// Using `makeCopy` to avoid the issue.
@@ -161,7 +267,9 @@ object CometNativeScanExec {
wrapped.tableIdentifier,
wrapped.disableBucketedScan,
wrapped,
- SerializedPlan(None))
+ SerializedPlan(None),
+ scan,
+ sourceKey)
scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
batchScanExec
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index eba74c9e2..da2ae21a9 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -82,7 +82,8 @@ private[comet] object PlanDataInjector {
// Registry of injectors for different operator types
private val injectors: Seq[PlanDataInjector] = Seq(
- IcebergPlanDataInjector
+ IcebergPlanDataInjector,
+ NativeScanPlanDataInjector
// Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc.
)
@@ -112,7 +113,7 @@ private[comet] object PlanDataInjector {
case _ =>
throw new CometRuntimeException(s"Missing planning data for key:
$key")
}
- case None => // No key, skip injection
+ case None =>
}
}
@@ -191,6 +192,46 @@ private[comet] object IcebergPlanDataInjector extends
PlanDataInjector {
}
}
+/**
+ * Injector for NativeScan operators.
+ */
+private[comet] object NativeScanPlanDataInjector extends PlanDataInjector {
+
+ override def canInject(op: Operator): Boolean =
+ op.hasNativeScan &&
+ op.getNativeScan.hasCommon &&
+ !op.getNativeScan.hasFilePartition
+
+ override def getKey(op: Operator): Option[String] = {
+ // Reconstruct the same sourceKey that was used when storing the data
+ val common = op.getNativeScan.getCommon
+ val source = common.getSource
+ val keyComponents = Seq(
+ common.getRequiredSchemaList.toString,
+ common.getDataFiltersList.toString,
+ common.getProjectionVectorList.toString,
+ common.getFieldsList.toString)
+ val hashCode = keyComponents.mkString("|").hashCode
+ Some(s"${source}_${hashCode}")
+ }
+
+ override def inject(
+ op: Operator,
+ commonBytes: Array[Byte],
+ partitionBytes: Array[Byte]): Operator = {
+
+ val common = OperatorOuterClass.NativeScanCommon.parseFrom(commonBytes)
+ val partitionOnly = OperatorOuterClass.NativeScan.parseFrom(partitionBytes)
+
+ // Build complete NativeScan with common fields + this partition's file
list
+ val scanBuilder = OperatorOuterClass.NativeScan.newBuilder()
+ scanBuilder.setCommon(common)
+ scanBuilder.setFilePartition(partitionOnly.getFilePartition)
+
+ op.toBuilder.setNativeScan(scanBuilder).build()
+ }
+}
+
/**
* A Comet physical operator
*/
@@ -589,6 +630,12 @@ abstract class CometNativeExec extends CometExec {
Map(iceberg.metadataLocation -> iceberg.commonData),
Map(iceberg.metadataLocation -> iceberg.perPartitionData))
+ // Found a NativeScan with planning data
+ case nativeScan: CometNativeScanExec =>
+ (
+ Map(nativeScan.sourceKey -> nativeScan.commonData),
+ Map(nativeScan.sourceKey -> nativeScan.perPartitionData))
+
// Broadcast stages are boundaries - don't collect per-partition data
from inside them.
// After DPP filtering, broadcast scans may have different partition
counts than the
// probe side, causing ArrayIndexOutOfBoundsException in
CometExecRDD.getPartitions.
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
index f4a8b5ed8..b8db737a3 100644
---
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
+++
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala
@@ -66,11 +66,11 @@ class ParquetReadFromFakeHadoopFsSuite extends
CometTestBase with AdaptiveSparkP
p
}
assert(scans.size == 1)
+ // File partitions are now accessed from the scan field, not from the
protobuf
+ val filePartitions = scans.head.scan.getFilePartitions()
+ assert(filePartitions.nonEmpty)
assert(
- scans.head.nativeOp.getNativeScan
- .getFilePartitions(0)
- .getPartitionedFile(0)
- .getFilePath
+ filePartitions.head.files.head.filePath.toString
.startsWith(FakeHDFSFileSystem.PREFIX))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]