This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d526528fa2 [VL] Add RewriteCreateTableAsSelect for spark34 (#10646)
d526528fa2 is described below

commit d526528fa273802f5d906f53a3511ed9a42af973
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Sep 15 16:34:33 2025 +0800

    [VL] Add RewriteCreateTableAsSelect for spark34 (#10646)
    
    Spark 3.5 already supports columnar v2 write for CTAS after this PR was 
introduced [SPARK-43088][SQL] Respect RequiresDistributionAndOrdering in 
CTAS/RTAS spark#40734, this PR is essentially a copy of it.
---
 .../execution/enhanced/VeloxIcebergSuite.scala     |  27 +++
 .../org/apache/spark/sql/gluten/TestUtils.scala    |  46 +++++
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   5 +-
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   2 +
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |   4 +
 .../sql/extension/RewriteCreateTableAsSelect.scala | 202 +++++++++++++++++++++
 6 files changed, 285 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 44ee905d01..a7dfe4e79e 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -19,7 +19,9 @@ package org.apache.gluten.execution.enhanced
 import org.apache.gluten.execution.{IcebergSuite, VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec}
 import org.apache.gluten.tags.EnhancedFeaturesTest
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.gluten.TestUtils
 
 @EnhancedFeaturesTest
 class VeloxIcebergSuite extends IcebergSuite {
@@ -190,4 +192,29 @@ class VeloxIcebergSuite extends IcebergSuite {
       assert(result(0).get(0) == 2)
     }
   }
+
+  test("iceberg create table as select") {
+    withTable("iceberg_tb1", "iceberg_tb2") {
+      spark.sql("""
+                  |create table iceberg_tb1 (a int, pt int) using iceberg
+                  |partitioned by (pt)
+                  |""".stripMargin)
+
+      spark.sql("insert into table iceberg_tb1 values (1, 1), (2, 2)")
+
+      // CTAS
+      val sqlStr = """
+                     |create table iceberg_tb2 using iceberg
+                     |partitioned by (pt)
+                     |as select * from iceberg_tb1
+                     |""".stripMargin
+
+      TestUtils.checkExecutedPlanContains[VeloxIcebergAppendDataExec](spark, 
sqlStr)
+
+      checkAnswer(
+        spark.sql("select * from iceberg_tb2 order by a"),
+        Seq(Row(1, 1), Row(2, 2))
+      )
+    }
+  }
 }
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
new file mode 100644
index 0000000000..587c064b9c
--- /dev/null
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.gluten
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import scala.reflect.ClassTag
+
+object TestUtils {
+
+  def checkExecutedPlanContains[T: ClassTag](spark: SparkSession, sqlStr: 
String): Unit = {
+    var found = false
+    val queryListener = new QueryExecutionListener {
+      override def onFailure(f: String, qe: QueryExecution, e: Exception): 
Unit = {}
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
+        if (!found) {
+          found = 
qe.executedPlan.find(implicitly[ClassTag[T]].runtimeClass.isInstance(_)).isDefined
+        }
+      }
+    }
+    try {
+      spark.listenerManager.register(queryListener)
+      spark.sql(sqlStr)
+      spark.sparkContext.listenerBus.waitUntilEmpty()
+      assert(found)
+    } finally {
+      spark.listenerManager.unregister(queryListener)
+    }
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 45abbd5a55..50fced37ea 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.backendsapi.velox
 
-import org.apache.gluten.backendsapi.RuleApi
+import org.apache.gluten.backendsapi.{BackendsApiManager, RuleApi}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
@@ -66,6 +66,9 @@ object VeloxRuleApi {
     injector.injectOptimizerRule(RewriteCastFromArray.apply)
     injector.injectPostHocResolutionRule(ArrowConvertorRule.apply)
     injector.injectOptimizerRule(RewriteUnboundedWindow.apply)
+    if (BackendsApiManager.getSettings.supportAppendDataExec()) {
+      
injector.injectPlannerStrategy(SparkShimLoader.getSparkShims.getRewriteCreateTableAsSelect(_))
+    }
   }
 
   /**
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 3a59135c80..5f62c272f7 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -319,4 +319,6 @@ trait SparkShims {
   def unBase64FunctionFailsOnError(unBase64: UnBase64): Boolean = false
 
   def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType
+
+  def getRewriteCreateTableAsSelect(session: SparkSession): SparkStrategy = _ 
=> Seq.empty
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 16affb2ad8..c097833435 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -50,6 +50,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.extension.RewriteCreateTableAsSelect
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, 
StructField, StructType}
@@ -647,4 +648,7 @@ class Spark34Shims extends SparkShims {
     DecimalPrecision.widerDecimalType(d1, d2)
   }
 
+  override def getRewriteCreateTableAsSelect(session: SparkSession): 
SparkStrategy = {
+    RewriteCreateTableAsSelect(session)
+  }
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/extension/RewriteCreateTableAsSelect.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/extension/RewriteCreateTableAsSelect.scala
new file mode 100644
index 0000000000..c8970456a6
--- /dev/null
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/extension/RewriteCreateTableAsSelect.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.extension
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
FileSourceMetadataAttribute}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, 
METADATA_COL_ATTR_KEY}
+import org.apache.spark.sql.connector.catalog._
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import org.apache.spark.sql.connector.catalog.CatalogV2Util.isSessionCatalog
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
LeafV2CommandExec}
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+import org.apache.spark.util.Utils
+
+import scala.collection.JavaConverters._
+
+case class RewriteCreateTableAsSelect(session: SparkSession) extends 
SparkStrategy {
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    plan match {
+      case CreateTableAsSelect(
+            ResolvedIdentifier(catalog, ident),
+            parts,
+            query,
+            tableSpec: TableSpec,
+            options,
+            ifNotExists,
+            analyzedQuery) if analyzedQuery.isDefined && 
!supportsV1Command(catalog) =>
+        catalog match {
+          case staging: StagingTableCatalog =>
+            AtomicCreateTableAsSelectExec(
+              staging,
+              ident,
+              parts,
+              analyzedQuery.get,
+              qualifyLocInTableSpec(tableSpec),
+              options,
+              ifNotExists) :: Nil
+          case _ =>
+            CreateTableAsSelectExec(
+              catalog.asTableCatalog,
+              ident,
+              parts,
+              analyzedQuery.get,
+              qualifyLocInTableSpec(tableSpec),
+              options,
+              ifNotExists) :: Nil
+        }
+
+      case _ => Nil
+    }
+  }
+
+  private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
+    isSessionCatalog(catalog) && catalog.isInstanceOf[CatalogExtension]
+  }
+
+  private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
+    tableSpec.copy(location = 
tableSpec.location.map(makeQualifiedDBObjectPath))
+  }
+
+  private def makeQualifiedDBObjectPath(location: String): String = {
+    CatalogUtils.makeQualifiedDBObjectPath(
+      session.sharedState.conf.get(WAREHOUSE_PATH),
+      location,
+      session.sharedState.hadoopConf)
+  }
+}
+
+/** Port from Spark 3.5. */
+case class AtomicCreateTableAsSelectExec(
+    catalog: StagingTableCatalog,
+    ident: Identifier,
+    partitioning: Seq[Transform],
+    query: LogicalPlan,
+    tableSpec: TableSpec,
+    writeOptions: Map[String, String],
+    ifNotExists: Boolean)
+  extends V2CreateTableAsSelectBaseExec {
+
+  val properties: Map[String, String] = 
CatalogV2Util.convertTableProperties(tableSpec)
+
+  override protected def run(): Seq[InternalRow] = {
+    if (catalog.tableExists(ident)) {
+      if (ifNotExists) {
+        return Nil
+      }
+      throw QueryCompilationErrors.tableAlreadyExistsError(ident)
+    }
+
+    val stagedTable = catalog.stageCreate(
+      ident,
+      getV2Columns(query.schema),
+      partitioning.toArray,
+      properties.asJava)
+    writeToTable(catalog, stagedTable, writeOptions, ident, query)
+  }
+}
+
+case class CreateTableAsSelectExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    partitioning: Seq[Transform],
+    query: LogicalPlan,
+    tableSpec: TableSpec,
+    writeOptions: Map[String, String],
+    ifNotExists: Boolean)
+  extends V2CreateTableAsSelectBaseExec {
+
+  val properties: Map[String, String] = 
CatalogV2Util.convertTableProperties(tableSpec)
+
+  override protected def run(): Seq[InternalRow] = {
+    if (catalog.tableExists(ident)) {
+      if (ifNotExists) {
+        return Nil
+      }
+      throw QueryCompilationErrors.tableAlreadyExistsError(ident)
+    }
+    val table = catalog.createTable(
+      ident,
+      getV2Columns(query.schema),
+      partitioning.toArray,
+      properties.asJava)
+    writeToTable(catalog, table, writeOptions, ident, query)
+  }
+}
+
+trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
+  override def output: Seq[Attribute] = Nil
+
+  protected def getV2Columns(schema: StructType): Array[Column] = {
+    val rawSchema = 
CharVarcharUtils.getRawSchema(removeInternalMetadata(schema), conf)
+    CatalogV2Util.structTypeToV2Columns(rawSchema.asNullable)
+  }
+
+  protected def writeToTable(
+      catalog: TableCatalog,
+      table: Table,
+      writeOptions: Map[String, String],
+      ident: Identifier,
+      query: LogicalPlan): Seq[InternalRow] = {
+    Utils.tryWithSafeFinallyAndFailureCallbacks({
+      val relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
+      val append = AppendData.byPosition(relation, query, writeOptions)
+      val qe = session.sessionState.executePlan(append)
+      qe.assertCommandExecuted()
+
+      table match {
+        case st: StagedTable => st.commitStagedChanges()
+        case _ =>
+      }
+
+      Nil
+    })(catchBlock = {
+      table match {
+        // Failure rolls back the staged writes and metadata changes.
+        case st: StagedTable => st.abortStagedChanges()
+        case _ => catalog.dropTable(ident)
+      }
+    })
+  }
+
+  val INTERNAL_METADATA_KEYS = Seq(
+    "__autoGeneratedAlias",
+    METADATA_COL_ATTR_KEY,
+    "__qualified_access_only",
+    FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY,
+    "__file_source_constant_metadata_col",
+    "__file_source_generated_metadata_col"
+  )
+
+  private def removeInternalMetadata(schema: StructType): StructType = {
+    StructType(schema.map {
+      field =>
+        var builder = new MetadataBuilder().withMetadata(field.metadata)
+        INTERNAL_METADATA_KEYS.foreach(key => builder = builder.remove(key))
+        field.copy(metadata = builder.build())
+    })
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to