cloud-fan commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2885040613
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -22,24 +22,66 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, NamedExpression, SchemaPruning}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.expressions.IdentityTransform
import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate,
Predicate}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset,
SupportsPushDownRequiredColumns, SupportsPushDownTableSample,
SupportsPushDownTopN, SupportsPushDownV2Filters}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
DataSourceUtils}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
+import org.apache.spark.sql.internal.connector.{PartitionPredicateImpl,
SupportsPushDownCatalystFilters}
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.Utils
object PushDownUtils {
+
+ /**
+ * Returns partition schema as a StructType when the table partitioning.
+ * Currently only supported for identity transforms on simple (single-name)
field references.
+ *
+ * @return Some(StructType) for partition transform types, if supported.
+ */
+ def getPartitionSchemaForPartitionPredicate(
+ relation: DataSourceV2Relation): Option[StructType] = {
+ val partitioning = relation.table.partitioning().toIndexedSeq
+ val partitionColNamesOpt: Seq[Option[String]] = partitioning.map {
+ case id: IdentityTransform =>
+ id.ref.fieldNames().toIndexedSeq match {
+ case Seq(name) => Some(name)
+ case _ => None // Not supported for multiple field names (e.g.
nested field)
+ }
+ case _ => None
+ }
+ partitionColNamesOpt match {
+ // Only support identity transform on simple field reference
+ case seq if seq.isEmpty || seq.exists(_.isEmpty) => None
+ case _ =>
+ val partitionColNames = partitionColNamesOpt.map(_.get)
+ val attrs = partitionColNames.map(name => relation.output.find(_.name
== name))
Review Comment:
**Case-sensitive column lookup (same issue flagged by @gengliangwang in
`accept()`, but at a different layer).**
`relation.output.find(_.name == name)` uses case-sensitive comparison. If
the partition transform references a column with different casing than
`relation.output` (e.g. transform says `Region` but output has `region`), this
returns `None` and silently disables enhanced partition filtering for the
entire table.
Should use `resolver` from `SQLConf` or `equalsIgnoreCase`:
```scala
val resolver = SQLConf.get.resolver
val attrs = partitionColNames.map(name =>
relation.output.find(a => resolver(a.name, name)))
```
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownV2Filters.java:
##########
@@ -60,4 +60,16 @@ public interface SupportsPushDownV2Filters extends
ScanBuilder {
* empty array should be returned for this case.
*/
Predicate[] pushedPredicates();
+
+ /**
+ * Returns true if this data source supports enhanced partition filtering: a
second call to
+ * {@link #pushPredicates(Predicate[])} with partition-only predicates (e.g.
+ * {@link
org.apache.spark.sql.connector.expressions.filter.PartitionPredicate}) will
+ * be called.
+ *
+ * @since 4.2.0
+ */
+ default boolean supportsEnhancedPartitionFiltering() {
Review Comment:
**`pushPredicates` two-call contract is not documented.**
When `supportsEnhancedPartitionFiltering()` returns `true`,
`pushPredicates()` will be called **twice** on the same `ScanBuilder` instance:
first with translated V2 predicates, then with `PartitionPredicate` objects.
The implementation must accumulate state across both calls (as the test
implementation does).
This two-call pattern is a significant behavioral contract that third-party
connector authors need to know about. The javadoc should document:
1. That `pushPredicates` will be called a second time with
`PartitionPredicate` instances.
2. That `pushedPredicates()` must return predicates from **both** calls.
3. That the second call only occurs after the first call completes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]