cloud-fan commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2885040613


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -22,24 +22,66 @@ import scala.collection.mutable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, NamedExpression, SchemaPruning}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.expressions.IdentityTransform
 import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate, 
Predicate}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, 
SupportsPushDownRequiredColumns, SupportsPushDownTableSample, 
SupportsPushDownTopN, SupportsPushDownV2Filters}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
DataSourceUtils}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
+import org.apache.spark.sql.internal.connector.{PartitionPredicateImpl, 
SupportsPushDownCatalystFilters}
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.Utils
 
 object PushDownUtils {
+
+  /**
+   * Returns partition schema as a StructType when the table partitioning.
+   * Currently only supported for identity transforms on simple (single-name) 
field references.
+   *
+   * @return Some(StructType) for partition transform types, if supported.
+   */
+  def getPartitionSchemaForPartitionPredicate(
+      relation: DataSourceV2Relation): Option[StructType] = {
+    val partitioning = relation.table.partitioning().toIndexedSeq
+    val partitionColNamesOpt: Seq[Option[String]] = partitioning.map {
+      case id: IdentityTransform =>
+        id.ref.fieldNames().toIndexedSeq match {
+          case Seq(name) => Some(name)
+          case _ => None // Not supported for multiple field names (e.g. 
nested field)
+        }
+      case _ => None
+    }
+    partitionColNamesOpt match {
+      // Only support identity transform on simple field reference
+      case seq if seq.isEmpty || seq.exists(_.isEmpty) => None
+      case _ =>
+        val partitionColNames = partitionColNamesOpt.map(_.get)
+        val attrs = partitionColNames.map(name => relation.output.find(_.name 
== name))

Review Comment:
   **Case-sensitive column lookup (same issue flagged by @gengliangwang in 
`accept()`, but at a different layer).**
   
   `relation.output.find(_.name == name)` uses case-sensitive comparison. If 
the partition transform references a column with different casing than 
`relation.output` (e.g. transform says `Region` but output has `region`), this 
returns `None` and silently disables enhanced partition filtering for the 
entire table.
   
   Should use `resolver` from `SQLConf` or `equalsIgnoreCase`:
   ```scala
   val resolver = SQLConf.get.resolver
   val attrs = partitionColNames.map(name =>
     relation.output.find(a => resolver(a.name, name)))
   ```



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownV2Filters.java:
##########
@@ -60,4 +60,16 @@ public interface SupportsPushDownV2Filters extends 
ScanBuilder {
    * empty array should be returned for this case.
    */
   Predicate[] pushedPredicates();
+
+  /**
+   * Returns true if this data source supports enhanced partition filtering: a 
second call to
+   * {@link #pushPredicates(Predicate[])} with partition-only predicates (e.g.
+   * {@link 
org.apache.spark.sql.connector.expressions.filter.PartitionPredicate}) will
+   * be called.
+   *
+   * @since 4.2.0
+   */
+  default boolean supportsEnhancedPartitionFiltering() {

Review Comment:
   **`pushPredicates` two-call contract is not documented.**
   
   When `supportsEnhancedPartitionFiltering()` returns `true`, 
`pushPredicates()` will be called **twice** on the same `ScanBuilder` instance: 
first with translated V2 predicates, then with `PartitionPredicate` objects. 
The implementation must accumulate state across both calls (as the test 
implementation does).
   
   This two-call pattern is a significant behavioral contract that third-party 
connector authors need to know about. The javadoc should document:
   1. That `pushPredicates` will be called a second time with 
`PartitionPredicate` instances.
   2. That `pushedPredicates()` must return predicates from **both** calls.
   3. That the second call only occurs after the first call completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to