peter-toth commented on code in PR #54330:
URL: https://github.com/apache/spark/pull/54330#discussion_r2854417515


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala:
##########
@@ -78,49 +74,65 @@ case class BatchScanExec(
       val newPartitions = scan.toBatch.planInputPartitions()
 
       originalPartitioning match {
-        case p: KeyGroupedPartitioning =>
+        case p: KeyedPartitioning =>
           if (newPartitions.exists(!_.isInstanceOf[HasPartitionKey])) {
             throw new SparkException("Data source must have preserved the 
original partitioning " +
                 "during runtime filtering: not all partitions implement 
HasPartitionKey after " +
                 "filtering")
           }
-          val newPartitionValues = newPartitions.map(partition =>
+          val newPartitionKeys = newPartitions.map(partition =>
               
InternalRowComparableWrapper(partition.asInstanceOf[HasPartitionKey], 
p.expressions))
             .toSet
-          val oldPartitionValues = p.partitionValues
+          val oldPartitionKeys = p.partitionKeys
             .map(partition => InternalRowComparableWrapper(partition, 
p.expressions)).toSet
           // We require the new number of partition values to be equal or less 
than the old number
           // of partition values here. In the case of less than, empty 
partitions will be added for
           // those missing values that are not present in the new input 
partitions.
-          if (oldPartitionValues.size < newPartitionValues.size) {
+          if (oldPartitionKeys.size < newPartitionKeys.size) {
             throw new SparkException("During runtime filtering, data source 
must either report " +
                 "the same number of partition values, or a subset of partition 
values from the " +
-                s"original. Before: ${oldPartitionValues.size} partition 
values. " +
-                s"After: ${newPartitionValues.size} partition values")
+                s"original. Before: ${oldPartitionKeys.size} partition values. 
" +
+                s"After: ${newPartitionKeys.size} partition values")
           }
 
-          if (!newPartitionValues.forall(oldPartitionValues.contains)) {
+          if (!newPartitionKeys.forall(oldPartitionKeys.contains)) {
             throw new SparkException("During runtime filtering, data source 
must not report new " +
                 "partition values that are not present in the original 
partitioning.")
           }
 
-          groupPartitions(newPartitions.toImmutableArraySeq)
-            .map(_.groupedParts.map(_.parts)).getOrElse(Seq.empty)
+          val dataTypes = p.expressions.map(_.dataType)

Review Comment:
   I've just opened a ticket: 
[SPARK-55692](https://issues.apache.org/jira/browse/SPARK-55692) and 
PR:https://github.com/apache/spark/pull/54490 to sync the documentation with 
the code, but I can rebase it after the revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to