peter-toth commented on code in PR #54330:
URL: https://github.com/apache/spark/pull/54330#discussion_r2847645387
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -78,49 +74,65 @@ case class BatchScanExec(
val newPartitions = scan.toBatch.planInputPartitions()
originalPartitioning match {
- case p: KeyGroupedPartitioning =>
+ case p: KeyedPartitioning =>
if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
throw new SparkException("Data source must have preserved the
original partitioning " +
"during runtime filtering: not all partitions implement
HasPartitionKey after " +
"filtering")
}
- val newPartitionValues = newPartitions.map(partition =>
+ val newPartitionKeys = newPartitions.map(partition =>
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey],
p.expressions))
.toSet
- val oldPartitionValues = p.partitionValues
+ val oldPartitionKeys = p.partitionKeys
.map(partition => InternalRowComparableWrapper(partition,
p.expressions)).toSet
// We require the new number of partition values to be equal or less
than the old number
// of partition values here. In the case of less than, empty
partitions will be added for
// those missing values that are not present in the new input
partitions.
- if (oldPartitionValues.size < newPartitionValues.size) {
+ if (oldPartitionKeys.size < newPartitionKeys.size) {
throw new SparkException("During runtime filtering, data source
must either report " +
"the same number of partition values, or a subset of partition
values from the " +
- s"original. Before: ${oldPartitionValues.size} partition
values. " +
- s"After: ${newPartitionValues.size} partition values")
+ s"original. Before: ${oldPartitionKeys.size} partition values.
" +
+ s"After: ${newPartitionKeys.size} partition values")
}
- if (!newPartitionValues.forall(oldPartitionValues.contains)) {
+ if (!newPartitionKeys.forall(oldPartitionKeys.contains)) {
throw new SparkException("During runtime filtering, data source
must not report new " +
"partition values that are not present in the original
partitioning.")
}
- groupPartitions(newPartitions.toImmutableArraySeq)
- .map(_.groupedParts.map(_.parts)).getOrElse(Seq.empty)
+ val dataTypes = p.expressions.map(_.dataType)
Review Comment:
Yes we can, but before doing that it would be good to clarify the intent of
`SupportsRuntimeV2Filtering`:
https://github.com/apache/spark/blob/7d67ff3e58fee04f831db2606c14efbdeac2c0ed/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsRuntimeV2Filtering.java#L55-L57
In my understanding when a scan reports `KeyedPartitioning` there shouldn't
be less number of partitions after calling
`filterableScan.filter(dataSourceFilters.toArray)` than it was before. But
there is so without this logic some of the DPP tests fail. I'm not sure though
if we changed this requirement or our test `InMemoryTable` does something
incorrectly or I get the documentation wrong.
cc @szehon-ho
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]