szehon-ho commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2893239467


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -22,24 +22,66 @@ import scala.collection.mutable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, NamedExpression, SchemaPruning}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.expressions.IdentityTransform
 import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate, 
Predicate}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, 
SupportsPushDownRequiredColumns, SupportsPushDownTableSample, 
SupportsPushDownTopN, SupportsPushDownV2Filters}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
DataSourceUtils}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
+import org.apache.spark.sql.internal.connector.{PartitionPredicateImpl, 
SupportsPushDownCatalystFilters}
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.Utils
 
 object PushDownUtils {
+
+  /**
+   * Returns partition schema as a StructType when the table partitioning.
+   * Currently only supported for identity transforms on simple (single-name) 
field references.
+   *
+   * @return Some(StructType) for partition transform types, if supported.
+   */
+  def getPartitionSchemaForPartitionPredicate(
+      relation: DataSourceV2Relation): Option[StructType] = {
+    val partitioning = relation.table.partitioning().toIndexedSeq
+    val partitionColNamesOpt: Seq[Option[String]] = partitioning.map {
+      case id: IdentityTransform =>
+        id.ref.fieldNames().toIndexedSeq match {
+          case Seq(name) => Some(name)
+          case _ => None // Not supported for multiple field names (e.g. 
nested field)
+        }
+      case _ => None
+    }
+    partitionColNamesOpt match {
+      // Only support identity transform on simple field reference
+      case seq if seq.isEmpty || seq.exists(_.isEmpty) => None

Review Comment:
   I am not sure about this, it  may cause more problem in the short term.  The 
contract is the DSV2 connector needs to pass an `InternalRow partitionKey` to 
PartitionPredicate.accept() that matches the Table.partitioning() schema.  Now 
we are saying, for the time being let them pass one that only matches a pruned 
partition schema that only has identity columns (so it doesnt match the actual 
Table.partitioning() schema) .  
   
   Once we fully support the PartitionPredicate for arbitrary partition 
transform, it will break because the contract will change back , saying the 
`InternalRow partitionKey` match the original Table.partitioning() schema.
   
   By the way, the only consumer I know yet is Iceberg, which use the partition 
transform year(), hour(), etc



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -22,24 +22,66 @@ import scala.collection.mutable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, NamedExpression, SchemaPruning}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.expressions.IdentityTransform
 import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate, 
Predicate}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, 
SupportsPushDownRequiredColumns, SupportsPushDownTableSample, 
SupportsPushDownTopN, SupportsPushDownV2Filters}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
DataSourceUtils}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
+import org.apache.spark.sql.internal.connector.{PartitionPredicateImpl, 
SupportsPushDownCatalystFilters}
 import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.Utils
 
 object PushDownUtils {
+
+  /**
+   * Returns partition schema as a StructType when the table partitioning.
+   * Currently only supported for identity transforms on simple (single-name) 
field references.
+   *
+   * @return Some(StructType) for partition transform types, if supported.
+   */
+  def getPartitionSchemaForPartitionPredicate(
+      relation: DataSourceV2Relation): Option[StructType] = {
+    val partitioning = relation.table.partitioning().toIndexedSeq
+    val partitionColNamesOpt: Seq[Option[String]] = partitioning.map {
+      case id: IdentityTransform =>
+        id.ref.fieldNames().toIndexedSeq match {
+          case Seq(name) => Some(name)
+          case _ => None // Not supported for multiple field names (e.g. 
nested field)
+        }
+      case _ => None
+    }
+    partitionColNamesOpt match {
+      // Only support identity transform on simple field reference
+      case seq if seq.isEmpty || seq.exists(_.isEmpty) => None

Review Comment:
   I am not sure about this, it  may cause more problem in the short term.
   
   The contract is the DSV2 connector needs to pass an `InternalRow 
partitionKey` to PartitionPredicate.accept() that matches the 
Table.partitioning() schema.  Now we are saying, for the time being let them 
pass one that only matches a pruned partition schema that only has identity 
columns (so it doesnt match the actual Table.partitioning() schema) .  
   
   Once we fully support the PartitionPredicate for arbitrary partition 
transform, it will break because the contract will change back , saying the 
`InternalRow partitionKey` match the original Table.partitioning() schema.
   
   By the way, the only consumer I know yet is Iceberg, which use the partition 
transform year(), hour(), etc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to