szehon-ho commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2893171462
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -22,24 +22,66 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, NamedExpression, SchemaPruning}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.expressions.IdentityTransform
import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate,
Predicate}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset,
SupportsPushDownRequiredColumns, SupportsPushDownTableSample,
SupportsPushDownTopN, SupportsPushDownV2Filters}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
DataSourceUtils}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
+import org.apache.spark.sql.internal.connector.{PartitionPredicateImpl,
SupportsPushDownCatalystFilters}
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.Utils
object PushDownUtils {
+
+ /**
+ * Returns partition schema as a StructType when the table partitioning.
+ * Currently only supported for identity transforms on simple (single-name)
field references.
+ *
+ * @return Some(StructType) for partition transform types, if supported.
+ */
+ def getPartitionSchemaForPartitionPredicate(
+ relation: DataSourceV2Relation): Option[StructType] = {
+ val partitioning = relation.table.partitioning().toIndexedSeq
+ val partitionColNamesOpt: Seq[Option[String]] = partitioning.map {
+ case id: IdentityTransform =>
+ id.ref.fieldNames().toIndexedSeq match {
+ case Seq(name) => Some(name)
+ case _ => None // Not supported for multiple field names (e.g.
nested field)
+ }
+ case _ => None
+ }
+ partitionColNamesOpt match {
+ // Only support identity transform on simple field reference
+ case seq if seq.isEmpty || seq.exists(_.isEmpty) => None
+ case _ =>
+ val partitionColNames = partitionColNamesOpt.map(_.get)
+ val attrs = partitionColNames.map(name => relation.output.find(_.name
== name))
Review Comment:
i replied to @gengliangwang comment earlier. It might also apply to this
comment:
current dsv1 does not support case insensitive partition filter: see
https://github.com/apache/spark/blob/972897433082b1a7136b877b4fa37970961169d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L191
it sounds like a new feature, should we add it later with another flag? else
it diverge from current DSV1 behavior
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -22,24 +22,66 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, NamedExpression, SchemaPruning}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.expressions.IdentityTransform
import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate,
Predicate}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset,
SupportsPushDownRequiredColumns, SupportsPushDownTableSample,
SupportsPushDownTopN, SupportsPushDownV2Filters}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
DataSourceUtils}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
+import org.apache.spark.sql.internal.connector.{PartitionPredicateImpl,
SupportsPushDownCatalystFilters}
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.Utils
object PushDownUtils {
+
+ /**
+ * Returns partition schema as a StructType when the table partitioning.
+ * Currently only supported for identity transforms on simple (single-name)
field references.
+ *
+ * @return Some(StructType) for partition transform types, if supported.
+ */
+ def getPartitionSchemaForPartitionPredicate(
+ relation: DataSourceV2Relation): Option[StructType] = {
+ val partitioning = relation.table.partitioning().toIndexedSeq
+ val partitionColNamesOpt: Seq[Option[String]] = partitioning.map {
+ case id: IdentityTransform =>
+ id.ref.fieldNames().toIndexedSeq match {
+ case Seq(name) => Some(name)
+ case _ => None // Not supported for multiple field names (e.g.
nested field)
+ }
+ case _ => None
+ }
+ partitionColNamesOpt match {
+ // Only support identity transform on simple field reference
+ case seq if seq.isEmpty || seq.exists(_.isEmpty) => None
+ case _ =>
+ val partitionColNames = partitionColNamesOpt.map(_.get)
+ val attrs = partitionColNames.map(name => relation.output.find(_.name
== name))
Review Comment:
i replied to @gengliangwang comment earlier. It might also apply to this
comment?
current dsv1 does not support case insensitive partition filter: see
https://github.com/apache/spark/blob/972897433082b1a7136b877b4fa37970961169d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala#L191
it sounds like a new feature, should we add it later with another flag? else
it diverge from current DSV1 behavior
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]