peter-toth commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2895028233
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -96,18 +149,55 @@ object PushDownUtils {
}
}
- // Data source filters that need to be evaluated again after scanning.
which means
- // the data source cannot guarantee the rows returned can pass these
filters.
- // As a result we must return it so Spark can plan an extra filter
operator.
- val postScanFilters = r.pushPredicates(translatedFilters.toArray).map
{ predicate =>
- DataSourceV2Strategy.rebuildExpressionFromFilter(predicate,
translatedFilterToExpr)
+ // Post-scan filters candidates: those the data source rejected in the
first pass
+ // and need to be evaluated by Spark after the scan.
+ val returnedFirstPassFilters =
r.pushPredicates(translatedFilters.toArray).map {
+ predicate =>
+ DataSourceV2Strategy.rebuildExpressionFromFilter(predicate,
translatedFilterToExpr)
+ }
+
+ val finalPostScanFilters = (partitionSchema,
r.supportsEnhancedPartitionFiltering()) match {
+ // If the scan supports enhanced partition filtering, convert to
PartitionPredicates
+ // (see SPARK-55596). PartitionPredicates are pushed to the scan in
a second pass.
+ case (Some(structType), true) =>
+ //
+ val (postScanPartitionFilters, postScanDataFilters) =
+ DataSourceUtils.getPartitionFiltersAndDataFilters(
+ structType, returnedFirstPassFilters.toIndexedSeq)
+ val (untranslatablePartitionFilters, untranslatableDataFilters) =
+ DataSourceUtils.getPartitionFiltersAndDataFilters(
+ structType, untranslatableExprs.toSeq)
+
+ // Push second-pass partition filters as PartitionPredicates
+ val allPartitionPredicates = (postScanPartitionFilters ++
+ untranslatablePartitionFilters)
+ .map(expr => new PartitionPredicateImpl(expr,
toAttributes(structType)))
+ val returnedSecondPassPartitionFilters =
+ r.pushPredicates(allPartitionPredicates.toArray).map { predicate
=>
+ V2ExpressionUtils.toCatalyst(predicate).getOrElse(
+ DataSourceV2Strategy.rebuildExpressionFromFilter(
+ predicate, translatedFilterToExpr))
+ }
+
+ // Normally translated filters (postScanFilters) are simple
filters that can be
+ // evaluated faster, while the untranslated filters are
complicated filters that take
+ // more time to evaluate, so we want to evaluate the
postScanFilters filters first.
+ val untranslatableSet = ExpressionSet(untranslatableExprs)
+ val returnedPostScanPartitionFilters =
+ returnedSecondPassPartitionFilters.filter(e =>
!untranslatableSet.contains(e))
+ val returnedUntranslatablePartitionFilters =
+
returnedSecondPassPartitionFilters.filter(untranslatableSet.contains)
+ val postScanFilters = postScanDataFilters ++
returnedPostScanPartitionFilters ++
+ untranslatableDataFilters ++
returnedUntranslatablePartitionFilters
+ postScanFilters.toArray.toImmutableArraySeq
Review Comment:
`.toArray.toImmutableArraySeq` seems unecessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]