This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 35afdb4316 [HUDI-4178] Addressing performance regressions in Spark 
DataSourceV2 Integration (#5737)
35afdb4316 is described below

commit 35afdb4316d496bbb37ebb9e1598d84bd8a4000d
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Tue Jun 7 16:30:46 2022 -0700

    [HUDI-4178] Addressing performance regressions in Spark DataSourceV2 
Integration (#5737)
    
    There are multiple issues with our current DataSource V2 integrations: b/c 
we advertise Hudi tables as V2, Spark expects it to implement certain APIs 
which are not implemented at the moment, instead we're using custom Resolution 
rule (in HoodieSpark3Analysis) to instead manually fallback to V1 APIs.  This 
commit fixes the issue by reverting DSv2 APIs and making Spark use V1, except 
for schema evaluation logic.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +--
 .../scala/org/apache/hudi/util/JFunction.scala     |  33 +++++++
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  18 ++--
 .../hudi/testutils/HoodieClientTestHarness.java    |  34 ++++++-
 .../hudi/common/config/HoodieCommonConfig.java     |   5 +
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   5 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  14 ++-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  49 ++++++----
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   4 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   7 +-
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |   7 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  10 +-
 .../hudi/command/CreateHoodieTableCommand.scala    |   4 +-
 .../sql/hudi/HoodieSparkSessionExtension.scala     |  10 +-
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  66 +++++++++----
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |  11 +++
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |   8 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   6 --
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   4 +-
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |  13 ---
 .../sql/hudi/ResolveHudiAlterTableCommand312.scala |   7 +-
 .../org/apache/hudi/Spark3DefaultSource.scala      |  17 ++--
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |  13 ---
 .../hudi/ResolveHudiAlterTableCommandSpark32.scala |  59 +++++------
 .../sql/hudi/analysis/HoodieSpark3Analysis.scala   | 103 +++++++++++---------
 .../spark/sql/hudi/catalog/HoodieCatalog.scala     | 108 ++++++++++++---------
 .../sql/hudi/catalog/HoodieInternalV2Table.scala   |   4 +-
 28 files changed, 375 insertions(+), 257 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 31ce05173a..d18238fa4b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -175,11 +175,6 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Schema string representing the latest schema of the 
table. Hudi passes this to "
           + "implementations of evolution of schema");
 
-  public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = 
ConfigProperty
-      .key("hoodie.schema.on.read.enable")
-      .defaultValue(false)
-      .withDocumentation("enable full schema evolution for hoodie");
-
   public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE = 
ConfigProperty
       .key("hoodie.schema.cache.enable")
       .defaultValue(false)
@@ -929,11 +924,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean getSchemaEvolutionEnable() {
-    return getBoolean(SCHEMA_EVOLUTION_ENABLE);
+    return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE);
   }
 
   public void setSchemaEvolutionEnable(boolean enable) {
-    setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
+    setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, 
String.valueOf(enable));
   }
 
   /**
@@ -2175,7 +2170,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withSchemaEvolutionEnable(boolean enable) {
-      writeConfig.setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
+      writeConfig.setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, 
String.valueOf(enable));
       return this;
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
new file mode 100644
index 0000000000..4a7dca8408
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util
+
+/**
+ * Utility allowing for seamless conversion b/w Java/Scala functional 
primitives
+ */
+object JFunction {
+
+  def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
+    (t: T) => f.apply(t)
+
+  def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
+    new java.util.function.Consumer[T] {
+      override def accept(t: T): Unit = f.apply(t)
+    }
+
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index a97743e62f..1823e61b22 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, 
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.{FilePartition, 
LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 
 import java.util.Locale
 
@@ -141,8 +140,8 @@ trait SparkAdapter extends Serializable {
       maxSplitBytes: Long): Seq[FilePartition]
 
   def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
-    tripAlias(table) match {
-      case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
+    unfoldSubqueryAliases(table) match {
+      case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
       case relation: UnresolvedRelation =>
         isHoodieTable(toTableIdentifier(relation), spark)
       case _=> false
@@ -162,20 +161,15 @@ trait SparkAdapter extends Serializable {
     isHoodieTable(table)
   }
 
-  def tripAlias(plan: LogicalPlan): LogicalPlan = {
+  protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = {
     plan match {
       case SubqueryAlias(_, relation: LogicalPlan) =>
-        tripAlias(relation)
+        unfoldSubqueryAliases(relation)
       case other =>
         other
     }
   }
 
-  /**
-    * Create customresolutionRule to deal with alter command for hudi.
-    */
-  def createResolveHudiAlterTableCommand(sparkSession: SparkSession): 
Rule[LogicalPlan]
-
   /**
     * Create instance of [[ParquetFileFormat]]
     */
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index d0365dced1..1b69d7db4e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.HoodieConversionUtils;
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -73,12 +74,14 @@ import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.timeline.service.TimelineService;
+import org.apache.hudi.util.JFunction;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SparkSessionExtensions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -98,6 +101,7 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -145,6 +149,10 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
     FileSystem.closeAll();
   }
 
+  protected Option<Consumer<SparkSessionExtensions>> 
getSparkSessionExtensionsInjector() {
+    return Option.empty();
+  }
+
   @BeforeEach
   public void setTestMethodName(TestInfo testInfo) {
     if (testInfo.getTestMethod().isPresent()) {
@@ -186,16 +194,32 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
    * @param appName The specified application name.
    */
   protected void initSparkContexts(String appName) {
+    Option<Consumer<SparkSessionExtensions>> sparkSessionExtensionsInjector =
+        getSparkSessionExtensionsInjector();
+
+    if (sparkSessionExtensionsInjector.isPresent()) {
+      // In case we need to inject extensions into Spark Session, we have
+      // to stop any session that might still be active and since Spark will 
try
+      // to re-use it
+      HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession())
+          .ifPresent(SparkSession::stop);
+    }
+
     // Initialize a local spark env
     jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + 
testMethodName));
     jsc.setLogLevel("ERROR");
-    hadoopConf = jsc.hadoopConfiguration();
 
-    // SQLContext stuff
-    sqlContext = new SQLContext(jsc);
+    hadoopConf = jsc.hadoopConfiguration();
     context = new HoodieSparkEngineContext(jsc);
-    hadoopConf = context.getHadoopConf().get();
-    sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
+
+    sparkSession = SparkSession.builder()
+        .withExtensions(JFunction.toScala(sparkSessionExtensions -> {
+          sparkSessionExtensionsInjector.ifPresent(injector -> 
injector.accept(sparkSessionExtensions));
+          return null;
+        }))
+        .config(jsc.getConf())
+        .getOrCreate();
+    sqlContext = new SQLContext(sparkSession);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 6be92af9ad..cc62bcc328 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -31,6 +31,11 @@ import java.util.Properties;
     description = "The following set of configurations are common across 
Hudi.")
 public class HoodieCommonConfig extends HoodieConfig {
 
+  public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = 
ConfigProperty
+      .key("hoodie.schema.on.read.enable")
+      .defaultValue(false)
+      .withDocumentation("Enables support for Schema Evolution feature");
+
   public static final ConfigProperty<ExternalSpillableMap.DiskMapType> 
SPILLABLE_DISK_MAP_TYPE = ConfigProperty
       .key("hoodie.common.spillable.diskmap.type")
       .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index a62a402b6a..0102870e92 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, 
HoodieConfig}
 import org.apache.hudi.common.fs.ConsistencyGuardConfig
 import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -142,6 +142,9 @@ object DataSourceReadOptions {
     .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
     .defaultValue("false")
     .withDocumentation("When doing an incremental query whether we should fall 
back to full table scans if file does not exist.")
+
+  val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = 
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
+
   /** @deprecated Use {@link QUERY_TYPE} and its methods instead */
   @Deprecated
   val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index eee5a4881c..71c38f7655 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -25,11 +25,10 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
@@ -100,9 +99,18 @@ class DefaultSource extends RelationProvider
     val isBootstrappedTable = 
metaClient.getTableConfig.getBootstrapBasePath.isPresent
     val tableType = metaClient.getTableType
     val queryType = parameters(QUERY_TYPE.key)
-    val userSchema = if (schema == null) Option.empty[StructType] else 
Some(schema)
+    // NOTE: In cases when Hive Metastore is used as catalog and the table is 
partitioned, schema in the HMS might contain
+    //       Hive-specific partitioning columns created specifically for HMS 
to handle partitioning appropriately. In that
+    //       case  we opt in to not be providing catalog's schema, and instead 
force Hudi relations to fetch the schema
+    //       from the table itself
+    val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
+      None
+    } else {
+      Option(schema)
+    }
 
     log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: 
$tableType, queryType is: $queryType")
+
     if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() 
== 0) {
       new EmptyRelation(sqlContext, metaClient)
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 4a12256c43..47e391a560 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -34,7 +34,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.io.storage.HoodieHFileReader
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
@@ -74,7 +74,7 @@ case class HoodieTableState(tablePath: String,
 abstract class HoodieBaseRelation(val sqlContext: SQLContext,
                                   val metaClient: HoodieTableMetaClient,
                                   val optParams: Map[String, String],
-                                  userSchema: Option[StructType])
+                                  schemaSpec: Option[StructType])
   extends BaseRelation
     with FileRelation
     with PrunedFilteredScan
@@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    */
   protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) 
= {
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val avroSchema = Try(schemaResolver.getTableAvroSchema) match {
-      case Success(schema) => schema
-      case Failure(e) =>
-        logWarning("Failed to fetch schema from the table", e)
-        // If there is no commit in the table, we can't get the schema
-        // t/h [[TableSchemaResolver]], fallback to the provided 
[[userSchema]] instead.
-        userSchema match {
-          case Some(s) => convertToAvroSchema(s)
-          case _ => throw new IllegalArgumentException("User-provided schema 
is required in case the table is empty")
-        }
+    val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
+      Try(schemaResolver.getTableAvroSchema) match {
+        case Success(schema) => schema
+        case Failure(e) =>
+          logError("Failed to fetch schema from the table", e)
+          throw new HoodieSchemaException("Failed to fetch schema from the 
table")
+      }
     }
-    // try to find internalSchema
-    val internalSchemaFromMeta = try {
-      
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
-    } catch {
-      case _: Exception => InternalSchema.getEmptyInternalSchema
+
+    val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) {
+      InternalSchema.getEmptyInternalSchema
+    } else {
+      Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
+        case Success(internalSchemaOpt) =>
+          
toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema)
+        case Failure(e) =>
+          logWarning("Failed to fetch internal-schema from the table", e)
+          InternalSchema.getEmptyInternalSchema
+      }
     }
-    (avroSchema, internalSchemaFromMeta)
+
+    (avroSchema, internalSchema)
   }
 
   protected lazy val tableStructSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
@@ -503,6 +507,15 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   private def prunePartitionColumns(dataStructSchema: StructType): StructType =
     StructType(dataStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+
+  private def isSchemaEvolutionEnabled = {
+    // NOTE: Schema evolution could be configured both t/h optional parameters 
vehicle as well as
+    //       t/h Spark Session configuration (for ex, for Spark SQL)
+    optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean 
||
+    sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+  }
 }
 
 object HoodieBaseRelation extends SparkAdapterSupport {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index da2736e59b..84280559e9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
-import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, 
TypedProperties}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, 
HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
@@ -338,7 +338,7 @@ object HoodieSparkSqlWriter {
   def addSchemaEvolutionParameters(parameters: Map[String, String], 
internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
     val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else 
"false"
     parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> 
SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
-      HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
+      HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> 
schemaEvolutionEnable)
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 6042841586..63f1a7afc2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -18,11 +18,10 @@
 package org.apache.hudi
 
 import java.util.Properties
-
 import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
-import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieConfig, TypedProperties}
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
@@ -163,9 +162,9 @@ object HoodieWriterUtils {
     // Check schema evolution for bootstrap table.
     // now we do not support bootstrap table.
     if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
-      && params.getOrElse(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
"false").toBoolean) {
+      && params.getOrElse(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
"false").toBoolean) {
       throw new HoodieException(String
-        .format("now schema evolution cannot support bootstrap table, pls set 
%s to false", HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key()))
+        .format("now schema evolution cannot support bootstrap table, pls set 
%s to false", HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key()))
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index fa01ba37e9..e69d0d5293 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -253,8 +253,11 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
         .filterKeys(_.startsWith("hoodie."))
   }
 
-  def isEnableHive(sparkSession: SparkSession): Boolean =
-    "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+  /**
+   * Checks whether Spark is using Hive as Session's Catalog
+   */
+  def isUsingHiveCatalog(sparkSession: SparkSession): Boolean =
+    
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) == 
"hive"
 
   /**
    * Convert different query instant time format to the commit time format.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index eca73be0bb..3f67d5017f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, 
withSparkConf}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog, 
withSparkConf}
 import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -57,7 +57,7 @@ trait ProvidesHoodieConfig extends Logging {
 
     require(hoodieCatalogTable.primaryKeys.nonEmpty,
       s"There are no primary key in table 
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
-    val enableHive = isEnableHive(sparkSession)
+    val enableHive = isUsingHiveCatalog(sparkSession)
 
     val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf)
 
@@ -174,7 +174,7 @@ trait ProvidesHoodieConfig extends Logging {
 
     logInfo(s"Insert statement use write operation type: $operation, 
payloadClass: $payloadClassName")
 
-    val enableHive = isEnableHive(sparkSession)
+    val enableHive = isUsingHiveCatalog(sparkSession)
 
     withSparkConf(sparkSession, catalogProperties) {
       Map(
@@ -213,7 +213,7 @@ trait ProvidesHoodieConfig extends Logging {
                                  hoodieCatalogTable: HoodieCatalogTable,
                                  partitionsToDrop: String): Map[String, 
String] = {
     val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
-    val enableHive = isEnableHive(sparkSession)
+    val enableHive = isUsingHiveCatalog(sparkSession)
     val catalogProperties = hoodieCatalogTable.catalogProperties
     val tableConfig = hoodieCatalogTable.tableConfig
 
@@ -259,7 +259,7 @@ trait ProvidesHoodieConfig extends Logging {
     val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
 
     val options = hoodieCatalogTable.catalogProperties
-    val enableHive = isEnableHive(sparkSession)
+    val enableHive = isUsingHiveCatalog(sparkSession)
 
     withSparkConf(sparkSession, options) {
       Map(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index 9bf1d72152..75803fd779 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
 import org.apache.spark.sql.hive.HiveClientUtils
 import org.apache.spark.sql.hive.HiveExternalCatalog._
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
 import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
 import 
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
 import org.apache.spark.sql.types.StructType
@@ -144,7 +144,7 @@ object CreateHoodieTableCommand {
     )
 
     // Create table in the catalog
-    val enableHive = isEnableHive(sparkSession)
+    val enableHive = isUsingHiveCatalog(sparkSession)
     if (enableHive) {
       createHiveDataSourceTable(sparkSession, newTable, ignoreIfExists)
     } else {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
index fff44bb7f5..783875296c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
@@ -33,17 +33,13 @@ class HoodieSparkSessionExtension extends 
(SparkSessionExtensions => Unit)
       new HoodieCommonSqlParser(session, parser)
     }
 
-    HoodieAnalysis.customResolutionRules().foreach { rule =>
+    HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
       extensions.injectResolutionRule { session =>
-        rule(session)
+        ruleBuilder(session)
       }
     }
 
-    extensions.injectResolutionRule { session =>
-      sparkAdapter.createResolveHudiAlterTableCommand(session)
-    }
-
-    HoodieAnalysis.customPostHocResolutionRules().foreach { rule =>
+    HoodieAnalysis.customPostHocResolutionRules.foreach { rule =>
       extensions.injectPostHocResolutionRule { session =>
         rule(session)
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index dcacbef3a2..97e453ff7e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -39,45 +39,69 @@ import org.apache.spark.sql.{AnalysisException, 
SparkSession}
 
 import java.util
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 
 object HoodieAnalysis {
-  def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
-    Seq(
+  type RuleBuilder = SparkSession => Rule[LogicalPlan]
+
+  def customResolutionRules: Seq[RuleBuilder] = {
+    val rules: ListBuffer[RuleBuilder] = ListBuffer(
+      // Default rules
       session => HoodieResolveReferences(session),
       session => HoodieAnalysis(session)
-    ) ++ extraResolutionRules()
-
-  def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
-    Seq(
-      session => HoodiePostAnalysisRule(session)
-    ) ++ extraPostHocResolutionRules()
+    )
 
-  def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
     if (HoodieSparkUtils.gteqSpark3_2) {
+      val dataSourceV2ToV1FallbackClass = 
"org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
+      val dataSourceV2ToV1Fallback: RuleBuilder =
+        session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, 
session).asInstanceOf[Rule[LogicalPlan]]
+
       val spark3AnalysisClass = 
"org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
-      val spark3Analysis: SparkSession => Rule[LogicalPlan] =
+      val spark3Analysis: RuleBuilder =
         session => ReflectionUtils.loadClass(spark3AnalysisClass, 
session).asInstanceOf[Rule[LogicalPlan]]
 
-      val spark3ResolveReferences = 
"org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
-      val spark3References: SparkSession => Rule[LogicalPlan] =
-        session => ReflectionUtils.loadClass(spark3ResolveReferences, 
session).asInstanceOf[Rule[LogicalPlan]]
+      val spark3ResolveReferencesClass = 
"org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
+      val spark3ResolveReferences: RuleBuilder =
+        session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, 
session).asInstanceOf[Rule[LogicalPlan]]
 
-      Seq(spark3Analysis, spark3References)
-    } else {
-      Seq.empty
+      val spark32ResolveAlterTableCommandsClass = 
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
+      val spark32ResolveAlterTableCommands: RuleBuilder =
+        session => 
ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, 
session).asInstanceOf[Rule[LogicalPlan]]
+
+      // NOTE: PLEASE READ CAREFULLY
+      //
+      // It's critical for this rules to follow in this order, so that 
DataSource V2 to V1 fallback
+      // is performed prior to other rules being evaluated
+      rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, 
spark3ResolveReferences, spark32ResolveAlterTableCommands)
+
+    } else if (HoodieSparkUtils.gteqSpark3_1) {
+      val spark31ResolveAlterTableCommandsClass = 
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
+      val spark31ResolveAlterTableCommands: RuleBuilder =
+        session => 
ReflectionUtils.loadClass(spark31ResolveAlterTableCommandsClass, 
session).asInstanceOf[Rule[LogicalPlan]]
+
+      rules ++= Seq(spark31ResolveAlterTableCommands)
     }
+
+    rules
   }
 
-  def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
+  def customPostHocResolutionRules: Seq[RuleBuilder] = {
+    val rules: ListBuffer[RuleBuilder] = ListBuffer(
+      // Default rules
+      session => HoodiePostAnalysisRule(session)
+    )
+
     if (HoodieSparkUtils.gteqSpark3_2) {
       val spark3PostHocResolutionClass = 
"org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
-      val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] =
+      val spark3PostHocResolution: RuleBuilder =
         session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, 
session).asInstanceOf[Rule[LogicalPlan]]
 
-      Seq(spark3PostHocResolution)
-    } else {
-      Seq.empty
+      rules += spark3PostHocResolution
     }
+
+    rules
+  }
+
 }
 
 /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f7c62adc65..636599ce0c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -453,7 +453,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
 
     // Enable the hive sync by default if spark have enable the hive metastore.
-    val enableHive = isEnableHive(sparkSession)
+    val enableHive = isUsingHiveCatalog(sparkSession)
     withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
       Map(
         "path" -> path,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 7c86da0c9e..fea1ec3571 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,20 +18,24 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
+import org.apache.hudi.common.util
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.util.JFunction
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, concat, lit, udf}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.types._
 import org.joda.time.DateTime
 import org.joda.time.format.DateTimeFormat
@@ -42,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
 import java.sql.{Date, Timestamp}
+import java.util.function.Consumer
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -67,6 +72,12 @@ class TestCOWDataSource extends HoodieClientTestBase {
   val verificationCol: String = "driver"
   val updatedVerificationVal: String = "driver_update"
 
+  override def getSparkSessionExtensionsInjector: 
util.Option[Consumer[SparkSessionExtensions]] =
+    toJavaOption(
+      Some(
+        JFunction.toJava((receiver: SparkSessionExtensions) => new 
HoodieSparkSessionExtension().apply(receiver)))
+    )
+
   @BeforeEach override def setUp() {
     initPath()
     initSparkContexts()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index 68fc6d7c41..6736f44799 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -25,6 +25,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.util.Utils
+import org.joda.time.DateTimeZone
 import org.scalactic.source
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
 
@@ -40,7 +41,10 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     dir
   }
 
-  TimeZone.setDefault(DateTimeUtils.getTimeZone("CTT"))
+  // NOTE: We have to fix the timezone to make sure all date-/timestamp-bound 
utilities output
+  //       is consistent with the fixtures
+  DateTimeZone.setDefault(DateTimeZone.UTC)
+  TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
   protected lazy val spark: SparkSession = SparkSession.builder()
     .master("local[1]")
     .appName("hoodie sql test")
@@ -50,7 +54,7 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     .config("hoodie.upsert.shuffle.parallelism", "4")
     .config("hoodie.delete.shuffle.parallelism", "4")
     .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
-    .config("spark.sql.session.timeZone", "CTT")
+    .config("spark.sql.session.timeZone", "UTC")
     .config(sparkConf())
     .getOrCreate()
 
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 0e74c997d7..27c7d0c445 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -159,12 +159,6 @@ class Spark2Adapter extends SparkAdapter {
     throw new IllegalStateException(s"Should not call getRelationTimeTravel 
for spark2")
   }
 
-  override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): 
Rule[LogicalPlan] = {
-    new Rule[LogicalPlan] {
-      override def apply(plan: LogicalPlan): LogicalPlan = plan
-    }
-  }
-
   override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
     Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index e5f4476cc5..c47fbdead2 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -112,8 +112,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
   }
 
   override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean 
= {
-    tripAlias(table) match {
-      case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
+    unfoldSubqueryAliases(table) match {
+      case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
       case relation: UnresolvedRelation =>
         isHoodieTable(toTableIdentifier(relation), spark)
       case DataSourceV2Relation(table: Table, _, _, _, _) => 
isHoodieTable(table.properties())
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 22431cb257..9dcf530621 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -41,19 +41,6 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
   override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType): HoodieAvroDeserializer =
     new HoodieSpark3_1AvroDeserializer(rootAvroType, rootCatalystType)
 
-  override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): 
Rule[LogicalPlan] = {
-    if (SPARK_VERSION.startsWith("3.1")) {
-      val loadClassName = 
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
-      val clazz = Class.forName(loadClassName, true, 
Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      new Rule[LogicalPlan] {
-        override def apply(plan: LogicalPlan): LogicalPlan = plan
-      }
-    }
-  }
-
   override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
     Some(new Spark31HoodieParquetFileFormat(appendPartitionValues))
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala
index 522cecdaaf..11dff7eb86 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.common.config.HoodieCommonConfig
+
 import java.util.Locale
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
@@ -114,8 +116,9 @@ case class ResolveHudiAlterTableCommand312(sparkSession: 
SparkSession) extends R
       }
   }
 
-  private def schemaEvolutionEnabled(): Boolean = sparkSession
-    
.sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(),
 "false").toBoolean
+  private def schemaEvolutionEnabled(): Boolean =
+    
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
+      
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
 
   private def isHoodieTable(table: CatalogTable): Boolean = 
table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
 
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
index d94fee1f41..3bc3446d1f 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala
@@ -17,19 +17,19 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
-import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class Spark3DefaultSource extends DefaultSource with DataSourceRegister with 
TableProvider {
+/**
+ * NOTE: PLEASE READ CAREFULLY
+ *       All of Spark DataSourceV2 APIs are deliberately disabled to make sure
+ *       there are no regressions in performance
+ *       Please check out HUDI-4178 for more details
+ */
+class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* 
with TableProvider */ {
 
   override def shortName(): String = "hudi"
 
+  /*
   def inferSchema: StructType = new StructType()
 
   override def inferSchema(options: CaseInsensitiveStringMap): StructType = 
inferSchema
@@ -43,4 +43,5 @@ class Spark3DefaultSource extends DefaultSource with 
DataSourceRegister with Tab
 
     HoodieInternalV2Table(SparkSession.active, path)
   }
+  */
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index 15624c7411..1b045f6654 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -67,19 +67,6 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
     )
   }
 
-  override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): 
Rule[LogicalPlan] = {
-    if (SPARK_VERSION.startsWith("3.2")) {
-      val loadClassName = 
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
-      val clazz = Class.forName(loadClassName, true, 
Thread.currentThread().getContextClassLoader)
-      val ctor = clazz.getConstructors.head
-      ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]]
-    } else {
-      new Rule[LogicalPlan] {
-        override def apply(plan: LogicalPlan): LogicalPlan = plan
-      }
-    }
-  }
-
   override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): 
Option[ParquetFileFormat] = {
     Some(new Spark32HoodieParquetFileFormat(appendPartitionValues))
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala
index 96d919cf0a..f6f1826156 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.common.config.HoodieCommonConfig
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID
-import org.apache.spark.sql.catalyst.analysis.ResolvedTable
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, 
DropColumns, LogicalPlan, RenameColumn, ReplaceColumns, SetTableProperties, 
UnsetTableProperties}
+import org.apache.spark.sql.catalyst.analysis.ResolvedTable
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
 import org.apache.spark.sql.hudi.command.{AlterTableCommand => 
HudiAlterTableCommand}
@@ -33,33 +33,38 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand 
=> HudiAlterTableCom
   */
 class ResolveHudiAlterTableCommandSpark32(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-    case set @ SetTableProperties(asTable(table), _) if schemaEvolutionEnabled 
&& set.resolved =>
-      HudiAlterTableCommand(table, set.changes, ColumnChangeID.PROPERTY_CHANGE)
-    case unSet @ UnsetTableProperties(asTable(table), _, _) if 
schemaEvolutionEnabled && unSet.resolved =>
-      HudiAlterTableCommand(table, unSet.changes, 
ColumnChangeID.PROPERTY_CHANGE)
-    case drop @ DropColumns(asTable(table), _) if schemaEvolutionEnabled && 
drop.resolved =>
-      HudiAlterTableCommand(table, drop.changes, ColumnChangeID.DELETE)
-    case add @ AddColumns(asTable(table), _) if schemaEvolutionEnabled  && 
add.resolved =>
-      HudiAlterTableCommand(table, add.changes, ColumnChangeID.ADD)
-    case renameColumn @ RenameColumn(asTable(table), _, _) if 
schemaEvolutionEnabled && renameColumn.resolved=>
-      HudiAlterTableCommand(table, renameColumn.changes, ColumnChangeID.UPDATE)
-    case alter @ AlterColumn(asTable(table), _, _, _, _, _) if 
schemaEvolutionEnabled && alter.resolved =>
-      HudiAlterTableCommand(table, alter.changes, ColumnChangeID.UPDATE)
-    case replace @ ReplaceColumns(asTable(table), _) if schemaEvolutionEnabled 
&& replace.resolved =>
-      HudiAlterTableCommand(table, replace.changes, ColumnChangeID.REPLACE)
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    if (schemaEvolutionEnabled) {
+      plan.resolveOperatorsUp {
+        case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if 
set.resolved =>
+          HudiAlterTableCommand(t.v1Table, set.changes, 
ColumnChangeID.PROPERTY_CHANGE)
+        case unSet@UnsetTableProperties(ResolvedHoodieV2TablePlan(t), _, _) if 
unSet.resolved =>
+          HudiAlterTableCommand(t.v1Table, unSet.changes, 
ColumnChangeID.PROPERTY_CHANGE)
+        case drop@DropColumns(ResolvedHoodieV2TablePlan(t), _) if 
drop.resolved =>
+          HudiAlterTableCommand(t.v1Table, drop.changes, ColumnChangeID.DELETE)
+        case add@AddColumns(ResolvedHoodieV2TablePlan(t), _) if add.resolved =>
+          HudiAlterTableCommand(t.v1Table, add.changes, ColumnChangeID.ADD)
+        case renameColumn@RenameColumn(ResolvedHoodieV2TablePlan(t), _, _) if 
renameColumn.resolved =>
+          HudiAlterTableCommand(t.v1Table, renameColumn.changes, 
ColumnChangeID.UPDATE)
+        case alter@AlterColumn(ResolvedHoodieV2TablePlan(t), _, _, _, _, _) if 
alter.resolved =>
+          HudiAlterTableCommand(t.v1Table, alter.changes, 
ColumnChangeID.UPDATE)
+        case replace@ReplaceColumns(ResolvedHoodieV2TablePlan(t), _) if 
replace.resolved =>
+          HudiAlterTableCommand(t.v1Table, replace.changes, 
ColumnChangeID.REPLACE)
+      }
+    } else {
+      plan
+    }
   }
 
-  private def schemaEvolutionEnabled(): Boolean = sparkSession
-    
.sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(),
 "false").toBoolean
+  private def schemaEvolutionEnabled: Boolean =
+    
sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key,
+      
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean
 
-  object asTable {
-    def unapply(a: LogicalPlan): Option[CatalogTable] = {
-      a match {
-        case ResolvedTable(_, _, table: HoodieInternalV2Table, _) =>
-          table.catalogTable
-        case _ =>
-          None
+  object ResolvedHoodieV2TablePlan {
+    def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = {
+      plan match {
+        case ResolvedTable(_, _, v2Table: HoodieInternalV2Table, _) => 
Some(v2Table)
+        case _ => None
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
index 4c77733b14..e351174587 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
@@ -17,72 +17,77 @@
 
 package org.apache.spark.sql.hudi.analysis
 
-import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.{DefaultSource, SparkAdapterSupport}
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, 
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.{Table, V1Table}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
V2SessionCatalog}
-import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, 
getTableLocation, removeMetaFields, tableExistsInPath}
 import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table}
 import 
org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, 
ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
 
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 
 /**
- * Rule for convert the logical plan to command.
- * @param sparkSession
+ * NOTE: PLEASE READ CAREFULLY
+ *
+ * Since Hudi relations don't currently implement DS V2 Read API, we have to 
fallback to V1 here.
+ * Such fallback will have considerable performance impact, therefore it's 
only performed in cases
+ * where V2 API have to be used. Currently only such use-case is using of 
Schema Evolution feature
+ *
+ * Check out HUDI-4178 for more details
  */
-case class HoodieSpark3Analysis(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
-  with SparkAdapterSupport with ProvidesHoodieConfig {
+class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
+  with ProvidesHoodieConfig {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
-    case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
-      val output = dsv2.output
-      val catalogTable = if (d.catalogTable.isDefined) {
-        Some(d.v1Table)
-      } else {
-        None
-      }
+    case v2r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, 
_) =>
+      val output = v2r.output
+      val catalogTable = v2Table.catalogTable.map(_ => v2Table.v1Table)
       val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
-        buildHoodieConfig(d.hoodieCatalogTable))
+        buildHoodieConfig(v2Table.hoodieCatalogTable), 
v2Table.hoodieCatalogTable.tableSchema)
+
       LogicalRelation(relation, output, catalogTable, isStreaming = false)
-    case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, 
_, _) if a.query.resolved &&
-      r.table.isInstanceOf[HoodieInternalV2Table] &&
-      needsSchemaAdjustment(a.query, 
r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) =>
-      val projection = resolveQueryColumnsByOrdinal(a.query, r.output)
-      if (projection != a.query) {
-        a.copy(query = projection)
-      } else {
-        a
-      }
+  }
+}
+
+class HoodieSpark3Analysis(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
+    case s @ InsertIntoStatement(r @ DataSourceV2Relation(v2Table: 
HoodieInternalV2Table, _, _, _, _), partitionSpec, _, _, _, _)
+      if s.query.resolved && needsSchemaAdjustment(s.query, 
v2Table.hoodieCatalogTable.table, partitionSpec, r.schema) =>
+        val projection = resolveQueryColumnsByOrdinal(s.query, r.output)
+        if (projection != s.query) {
+          s.copy(query = projection)
+        } else {
+          s
+        }
   }
 
   /**
    * Need to adjust schema based on the query and relation schema, for example,
    * if using insert into xx select 1, 2 here need to map to column names
-   * @param query
-   * @param hoodieTable
-   * @param partitionSpec
-   * @param schema
-   * @return
    */
   private def needsSchemaAdjustment(query: LogicalPlan,
-                                    hoodieTable: HoodieInternalV2Table,
+                                    table: CatalogTable,
                                     partitionSpec: Map[String, Option[String]],
                                     schema: StructType): Boolean = {
     val output = query.output
     val queryOutputWithoutMetaFields = removeMetaFields(output)
-    val partitionFields = hoodieTable.hoodieCatalogTable.partitionFields
-    val partitionSchema = hoodieTable.hoodieCatalogTable.partitionSchema
+    val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
+
+    val partitionFields = hoodieCatalogTable.partitionFields
+    val partitionSchema = hoodieCatalogTable.partitionSchema
     val staticPartitionValues = partitionSpec.filter(p => 
p._2.isDefined).mapValues(_.get)
 
     assert(staticPartitionValues.isEmpty ||
@@ -91,8 +96,8 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) 
extends Rule[Logical
         s"is: ${staticPartitionValues.mkString("," + "")}")
 
     assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: 
${hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
+      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
+      s"Required select columns count: 
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
         s"Current select columns(including static partition column) count: " +
         s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
         s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
@@ -126,7 +131,6 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) 
extends Rule[Logical
 
 /**
  * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
- * @param sparkSession
  */
 case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
   with SparkAdapterSupport with ProvidesHoodieConfig {
@@ -173,28 +177,26 @@ case class HoodieSpark3ResolveReferences(sparkSession: 
SparkSession) extends Rul
 }
 
 /**
- * Rule for rewrite some spark commands to hudi's implementation.
- * @param sparkSession
+ * Rule replacing resolved Spark's commands (not working for Hudi tables 
out-of-the-box) with
+ * corresponding Hudi implementations
  */
 case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan match {
-      case ShowPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), 
specOpt, _) =>
+      case ShowPartitions(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), 
specOpt, _) =>
         ShowHoodieTablePartitionsCommand(
-          idt.asTableIdentifier, specOpt.map(s => 
s.asInstanceOf[UnresolvedPartitionSpec].spec))
+          id.asTableIdentifier, specOpt.map(s => 
s.asInstanceOf[UnresolvedPartitionSpec].spec))
 
       // Rewrite TruncateTableCommand to TruncateHoodieTableCommand
-      case TruncateTable(ResolvedTable(_, idt, _: HoodieInternalV2Table, _)) =>
-        TruncateHoodieTableCommand(idt.asTableIdentifier, None)
+      case TruncateTable(ResolvedTable(_, id, HoodieV1OrV2Table(_), _)) =>
+        TruncateHoodieTableCommand(id.asTableIdentifier, None)
 
-      case TruncatePartition(
-          ResolvedTable(_, idt, _: HoodieInternalV2Table, _),
-          partitionSpec: UnresolvedPartitionSpec) =>
-        TruncateHoodieTableCommand(idt.asTableIdentifier, 
Some(partitionSpec.spec))
+      case TruncatePartition(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), 
partitionSpec: UnresolvedPartitionSpec) =>
+        TruncateHoodieTableCommand(id.asTableIdentifier, 
Some(partitionSpec.spec))
 
-      case DropPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), 
specs, ifExists, purge) =>
+      case DropPartitions(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), 
specs, ifExists, purge) =>
         AlterHoodieTableDropPartitionCommand(
-          idt.asTableIdentifier,
+          id.asTableIdentifier,
           specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => 
s.spec),
           ifExists,
           purge,
@@ -205,3 +207,12 @@ case class HoodieSpark3PostAnalysisRule(sparkSession: 
SparkSession) extends Rule
     }
   }
 }
+
+private[sql] object HoodieV1OrV2Table extends SparkAdapterSupport {
+  def unapply(table: Table): Option[CatalogTable] = table match {
+    case V1Table(catalogTable) if sparkAdapter.isHoodieTable(catalogTable) => 
Some(catalogTable)
+    case v2: HoodieInternalV2Table => v2.catalogTable
+    case _ => None
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
index e1c2f228fa..2b3b7a0782 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.util.ConfigUtils
-import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
SparkAdapterSupport}
 import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException, UnresolvedAttribute}
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChan
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.hudi.analysis.HoodieV1OrV2Table
 import org.apache.spark.sql.hudi.command._
 import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -105,12 +106,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
           case _ =>
             catalogTable0
         }
-        HoodieInternalV2Table(
+
+        val v2Table = HoodieInternalV2Table(
           spark = spark,
           path = catalogTable.location.toString,
           catalogTable = Some(catalogTable),
           tableIdentifier = Some(ident.toString))
-      case o => o
+
+        val schemaEvolutionEnabled: Boolean = 
spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+          
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+
+        // NOTE: PLEASE READ CAREFULLY
+        //
+        // Since Hudi relations don't currently implement DS V2 Read API, we 
by default fallback to V1 here.
+        // Such fallback will have considerable performance impact, therefore 
it's only performed in cases
+        // where V2 API have to be used. Currently only such use-case is using 
of Schema Evolution feature
+        //
+        // Check out HUDI-4178 for more details
+        if (schemaEvolutionEnabled) {
+          v2Table
+        } else {
+          v2Table.v1TableWrapper
+        }
+
+      case t => t
     }
   }
 
@@ -132,7 +151,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
   override def dropTable(ident: Identifier): Boolean = {
     val table = loadTable(ident)
     table match {
-      case _: HoodieInternalV2Table =>
+      case HoodieV1OrV2Table(_) =>
         DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, 
isView = false, purge = false).run(spark)
         true
       case _ => super.dropTable(ident)
@@ -142,7 +161,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
   override def purgeTable(ident: Identifier): Boolean = {
     val table = loadTable(ident)
     table match {
-      case _: HoodieInternalV2Table =>
+      case HoodieV1OrV2Table(_) =>
         DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, 
isView = false, purge = true).run(spark)
         true
       case _ => super.purgeTable(ident)
@@ -153,56 +172,53 @@ class HoodieCatalog extends DelegatingCatalogExtension
   @throws[TableAlreadyExistsException]
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
     loadTable(oldIdent) match {
-      case _: HoodieInternalV2Table =>
+      case HoodieV1OrV2Table(_) =>
         AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, 
newIdent.asTableIdentifier, false).run(spark)
       case _ => super.renameTable(oldIdent, newIdent)
     }
   }
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
-    val tableIdent = TableIdentifier(ident.name(), 
ident.namespace().lastOption)
-    // scalastyle:off
-    val table = loadTable(ident) match {
-      case hoodieTable: HoodieInternalV2Table => hoodieTable
-      case _ => return super.alterTable(ident, changes: _*)
-    }
-    // scalastyle:on
-
-    val grouped = changes.groupBy(c => c.getClass)
-
-    grouped.foreach {
-      case (t, newColumns) if t == classOf[AddColumn] =>
-        AlterHoodieTableAddColumnsCommand(
-          tableIdent,
-          newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
-            StructField(
-              col.fieldNames()(0),
-              col.dataType(),
-              col.isNullable)
-          }).run(spark)
-      case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
-        columnChanges.foreach {
-          case dataType: UpdateColumnType =>
-            val colName = UnresolvedAttribute(dataType.fieldNames()).name
-            val newDataType = dataType.newDataType()
-            val structField = StructField(colName, newDataType)
-            AlterHoodieTableChangeColumnCommand(tableIdent, colName, 
structField).run(spark)
-          case dataType: UpdateColumnComment =>
-            val newComment = dataType.newComment()
-            val colName = UnresolvedAttribute(dataType.fieldNames()).name
-            val fieldOpt = 
table.schema().findNestedField(dataType.fieldNames(), includeCollections = true,
-              spark.sessionState.conf.resolver).map(_._2)
-            val field = fieldOpt.getOrElse {
-              throw new AnalysisException(
-                s"Couldn't find column $colName 
in:\n${table.schema().treeString}")
+    loadTable(ident) match {
+      case HoodieV1OrV2Table(table) => {
+        val tableIdent = TableIdentifier(ident.name(), 
ident.namespace().lastOption)
+        changes.groupBy(c => c.getClass).foreach {
+          case (t, newColumns) if t == classOf[AddColumn] =>
+            AlterHoodieTableAddColumnsCommand(
+              tableIdent,
+              newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
+                StructField(
+                  col.fieldNames()(0),
+                  col.dataType(),
+                  col.isNullable)
+              }).run(spark)
+
+          case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) 
=>
+            columnChanges.foreach {
+              case dataType: UpdateColumnType =>
+                val colName = UnresolvedAttribute(dataType.fieldNames()).name
+                val newDataType = dataType.newDataType()
+                val structField = StructField(colName, newDataType)
+                AlterHoodieTableChangeColumnCommand(tableIdent, colName, 
structField).run(spark)
+              case dataType: UpdateColumnComment =>
+                val newComment = dataType.newComment()
+                val colName = UnresolvedAttribute(dataType.fieldNames()).name
+                val fieldOpt = 
table.schema.findNestedField(dataType.fieldNames(), includeCollections = true,
+                  spark.sessionState.conf.resolver).map(_._2)
+                val field = fieldOpt.getOrElse {
+                  throw new AnalysisException(
+                    s"Couldn't find column $colName 
in:\n${table.schema.treeString}")
+                }
+                AlterHoodieTableChangeColumnCommand(tableIdent, colName, 
field.withComment(newComment)).run(spark)
             }
-            AlterHoodieTableChangeColumnCommand(tableIdent, colName, 
field.withComment(newComment)).run(spark)
+          case (t, _) =>
+            throw new UnsupportedOperationException(s"not supported table 
change: ${t.getClass}")
         }
-      case (t, _) =>
-        throw new UnsupportedOperationException(s"not supported table change: 
${t.getClass}")
-    }
 
-    loadTable(ident)
+        loadTable(ident)
+      }
+      case _ => super.alterTable(ident, changes: _*)
+    }
   }
 
   private def deduceTableLocationURIAndTableType(
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
index 848925aafe..9eb4a773f8 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
 import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, 
TableCapability, V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, 
TableCapability, V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
@@ -74,6 +74,8 @@ case class HoodieInternalV2Table(spark: SparkSession,
 
   override def v1Table: CatalogTable = hoodieCatalogTable.table
 
+  def v1TableWrapper: V1Table = V1Table(v1Table)
+
   override def partitioning(): Array[Transform] = {
     hoodieCatalogTable.partitionFields.map { col =>
       new IdentityTransform(new FieldReference(Seq(col)))

Reply via email to