This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 784ea417aec0 feat: introduce pk filter push-down to base file (#14183)
784ea417aec0 is described below

commit 784ea417aec0bcd79c7da9a98f73f9417abe9194
Author: TheR1sing3un <[email protected]>
AuthorDate: Tue Nov 4 09:46:50 2025 +0800

    feat: introduce pk filter push-down to base file (#14183)
    
    * feat: introduce pk filter push-down to base file
    
    ---------
    
    Signed-off-by: TheR1sing3un <[email protected]>
---
 .../SparkFileFormatInternalRowReaderContext.scala  |  16 ++-
 ...stSparkFileFormatInternalRowReaderContext.scala |  28 ++++-
 .../sql/hudi/dml/insert/TestInsertTable.scala      |  99 ++++++++++++++++
 .../hudi/procedure/TestBootstrapProcedure.scala    | 130 +++++++++++----------
 4 files changed, 207 insertions(+), 66 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 537189909dbe..950f5d1daf94 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -21,11 +21,11 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
-import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
 getAppliedRequiredSchema}
+import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
 filterIsSafeForPrimaryKey, getAppliedRequiredSchema}
 import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.HoodieTableConfig
 import 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -62,7 +62,9 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: 
SparkColumnarFileR
                                               tableConfig: HoodieTableConfig)
   extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig, 
SparkFileFormatInternalRecordContext.apply(tableConfig)) {
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
+  private lazy val recordKeyFields = 
Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty)
   private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
+  private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_, 
recordKeyFields)) ++ requiredFilters
   private lazy val allFilters = filters ++ requiredFilters
 
   override def getFileRecordIterator(filePath: StoragePath,
@@ -77,6 +79,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: 
SparkColumnarFileR
     }
     val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
     if (FSUtils.isLogFile(filePath)) {
+      // TODO: introduce pk filter in log file reader
       new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
         
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
     } else {
@@ -97,6 +100,8 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
       (schemaForRead, allFilters)
     } else if (!getHasLogFiles && hasRowIndexField) {
       (schemaForRead, bootstrapSafeFilters)
+    } else if (!getNeedsBootstrapMerge) {
+      (schemaForRead, morFilters)
     } else {
       (schemaForRead, requiredFilters)
     }
@@ -262,6 +267,13 @@ object SparkFileFormatInternalRowReaderContext {
     metaRefCount == filter.references.length || metaRefCount == 0
   }
 
+  /**
+   * Only valid if the filter's references only include primary key columns or 
{@link HoodieRecord.RECORD_KEY_METADATA_FIELD}
+   */
+  def filterIsSafeForPrimaryKey(filter: Filter, recordKeyFields: Set[String]): 
Boolean = {
+    filter.references.forall(c => recordKeyFields.contains(c.toLowerCase) || 
c.equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD))
+  }
+
   private def isIndexTempColumn(field: StructField): Boolean = {
     field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
index 738fc1cad6c9..6aaec1ca7427 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -20,7 +20,7 @@
 package org.apache.spark.execution.datasources.parquet
 
 import org.apache.hudi.SparkFileFormatInternalRowReaderContext
-import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
+import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
 filterIsSafeForPrimaryKey}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.HoodieTableConfig
 import 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
@@ -63,6 +63,32 @@ class TestSparkFileFormatInternalRowReaderContext extends 
SparkClientFunctionalT
     assertTrue(filterIsSafeForBootstrap(legalNestedFilter))
   }
 
+  @Test
+  def testPKFilter(): Unit = {
+    val recordKeyField = 
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName
+    val pk1 = "pk1"
+    val pk2 = "pk2"
+    val dataField = "data_col"
+    val pkFieldsSet = Set(pk1, pk2)
+
+    // case1: only record key or pk fields
+    val recordKeyFilter = IsNotNull(recordKeyField)
+    assertTrue(filterIsSafeForPrimaryKey(recordKeyFilter, pkFieldsSet))
+    val pkFieldFilter = IsNotNull(pk1)
+    assertTrue(filterIsSafeForPrimaryKey(pkFieldFilter, pkFieldsSet))
+    val pkFieldsFilter = And(IsNotNull(pk1), IsNotNull(pk2))
+    assertTrue(filterIsSafeForPrimaryKey(pkFieldsFilter, pkFieldsSet))
+    val pkAndRecordKeyFilter = And(And(IsNotNull(recordKeyField), 
IsNotNull(pk1)), IsNotNull(pk2))
+    assertTrue(filterIsSafeForPrimaryKey(pkAndRecordKeyFilter, pkFieldsSet))
+    // case2: with data field
+    val dataFieldFilter = IsNotNull(dataField)
+    assertFalse(filterIsSafeForPrimaryKey(dataFieldFilter, pkFieldsSet))
+    val illegalComplexFilter = Or(recordKeyFilter, dataFieldFilter)
+    assertFalse(filterIsSafeForPrimaryKey(illegalComplexFilter, pkFieldsSet))
+    val illegalNestedFilter = And(illegalComplexFilter, pkFieldFilter)
+    assertFalse(filterIsSafeForPrimaryKey(illegalNestedFilter, pkFieldsSet))
+  }
+
   @Test
   def testGetAppliedRequiredSchema(): Unit = {
     val fields = Array(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index e258d0057a2c..e7ea760de95d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -20,12 +20,14 @@
 package org.apache.spark.sql.hudi.dml.insert
 
 import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.apache.spark.sql.internal.SQLConf
 
 class TestInsertTable extends HoodieSparkSqlTestBase {
 
@@ -777,5 +779,102 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       }
     })
   }
+
+  test("Test Query With PK Filter") {
+    withTable(generateTableName) { tableName =>
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id,name',
+           |  preCombineField = 'ts',
+           |  'hoodie.index.type' = 'BUCKET',
+           |  'hoodie.bucket.index.num.buckets' = '1',
+           |  '${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key()}' = 
'parquet'
+           | )
+           | partitioned by (dt)
+          """.stripMargin
+      )
+      spark.conf.unset("hoodie.datasource.insert.dup.policy")
+
+      withSQLConf("hoodie.datasource.overwrite.mode" -> "dynamic") {
+        spark.sql(
+          s"""
+             | insert overwrite table $tableName partition(dt) values
+             | (0, 'a0', 10, 1000, '2023-12-06'),
+             | (1, 'a1', 10, 1000, '2023-12-06'),
+             | (2, 'a2', 11, 1000, '2023-12-06'),
+             | (3, 'a3', 10, 1000, '2023-12-06')
+          """.stripMargin)
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(0, "a0", 10.0, 1000, "2023-12-06"),
+          Seq(1, "a1", 10.0, 1000, "2023-12-06"),
+          Seq(2, "a2", 11.0, 1000, "2023-12-06"),
+          Seq(3, "a3", 10.0, 1000, "2023-12-06")
+        )
+      }
+      withSQLConf("hoodie.datasource.write.operation" -> "upsert") {
+        spark.sql(
+          s"""
+             | insert into table $tableName partition (dt='2023-12-06') values
+             | (1, 'a1', 11, 2000),
+             | (4, 'a4', 10, 1000)
+            """.stripMargin)
+      }
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(0, "a0", 10.0, 1000, "2023-12-06"),
+        Seq(1, "a1", 11.0, 2000, "2023-12-06"),
+        Seq(2, "a2", 11.0, 1000, "2023-12-06"),
+        Seq(3, "a3", 10.0, 1000, "2023-12-06"),
+        Seq(4, "a4", 10.0, 1000, "2023-12-06")
+      )
+
+      withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "true") {
+        checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or 
name = 'a3') and price <> 10")(
+          Seq(11.0, 2000, "2023-12-06")
+        )
+        // Filter(id = 1) and Filter(name = 'a3') can be push down, 
Filter(price <> 10) can't be push down since it's not primary key
+        var df = spark.sql(s"select price, ts, dt from $tableName where (id = 
1 or name = 'a3') and price <> 10")
+        // only execute file scan physical plan
+        // expected in file scan only (id: 1), (id: 3) and (id: 4, from log 
file) matched, (id: 3) and (id: 4, from log file)  matched but will be filtered 
later
+        
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+
+        checkAnswer(s"select price, ts, dt from $tableName where (id > 1 or 
name = 'a3') and price <> 10")(
+          Seq(11.0, 1000, "2023-12-06")
+        )
+        // Filter(id > 1) and Filter(name = 'a3') can be push down, 
Filter(price <> 10) can't be push down since it's not primary key
+        df = spark.sql(s"select price, ts, dt from $tableName where (id > 1 or 
name = 'a3') and price <> 10")
+        // expected in file scan only (id: 1, from log file) (id: 2), (id: 3) 
and (id: 4, from log file) matched, (id: 1, from log file) (id: 3) and (id: 4)  
matched but will be filtered later
+        
assertResult(4)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+      }
+
+      withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "false") {
+        spark.sql(s"set ${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}=false")
+        checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or 
name = 'a3') and price <> 10")(
+          Seq(11.0, 2000, "2023-12-06")
+        )
+        var df = spark.sql(s"select price, ts, dt from $tableName where (id = 
1 or name = 'a3') and price <> 10")
+        // only execute file scan physical plan
+        // expected all ids in the table are scanned, and filtered later
+        
assertResult(5)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+
+        checkAnswer(s"select price, ts, dt from $tableName where (id > 1 or 
name = 'a3') and price <> 10")(
+          Seq(11.0, 1000, "2023-12-06")
+        )
+        df = spark.sql(s"select price, ts, dt from $tableName where (id > 1 or 
name = 'a3') and price <> 10")
+        
assertResult(5)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+      }
+
+    }
+
+  }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index bc69b434bf77..1eebeb9e5e91 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -53,18 +53,20 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
         df.write.parquet(sourcePath + StoragePath.SEPARATOR + PARTITION_FIELD 
+ "=" + partitions.get(i))
       }
 
-      spark.sql("set hoodie.bootstrap.parallelism = 20")
-      spark.sql("set hoodie.metadata.index.column.stats.enable = false")
-      checkAnswer(
-        s"""call run_bootstrap(
-           |table => '$tableName',
-           |base_path => '$tablePath',
-           |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
-           |bootstrap_path => '$sourcePath',
-           |rowKey_field => '$RECORD_KEY_FIELD',
-           |partition_path_field => '$PARTITION_FIELD',
-           |bootstrap_overwrite => true)""".stripMargin) {
-        Seq(0)
+      withSQLConf(
+        "hoodie.bootstrap.parallelism" -> "20",
+        "hoodie.metadata.index.column.stats.enable" -> "false") {
+        checkAnswer(
+          s"""call run_bootstrap(
+             |table => '$tableName',
+             |base_path => '$tablePath',
+             |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+             |bootstrap_path => '$sourcePath',
+             |rowKey_field => '$RECORD_KEY_FIELD',
+             |partition_path_field => '$PARTITION_FIELD',
+             |bootstrap_overwrite => true)""".stripMargin) {
+          Seq(0)
+        }
       }
 
       // create table
@@ -91,11 +93,10 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
       // cluster with row writer disabled and assert that records match with 
that before clustering
       // NOTE: the row writer path is already tested in 
TestDataSourceForBootstrap
       val beforeClusterDf = spark.sql(s"select * from $tableName")
-      spark.sql("set hoodie.datasource.write.row.writer.enable = false")
-      spark.sql(s"""call run_clustering(table => '$tableName')""".stripMargin)
-      assertResult(0)(spark.sql(s"select * from 
$tableName").except(beforeClusterDf).count())
-
-      spark.sessionState.conf.unsetConf("unset 
hoodie.metadata.index.column.stats.enable") // HUDI-8774
+      withSQLConf("hoodie.datasource.write.row.writer.enable" -> "false") {
+        spark.sql(s"""call run_clustering(table => 
'$tableName')""".stripMargin)
+        assertResult(0)(spark.sql(s"select * from 
$tableName").except(beforeClusterDf).count())
+      }
     }
   }
 
@@ -121,19 +122,21 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
         df.write.parquet(sourcePath + StoragePath.SEPARATOR + PARTITION_FIELD 
+ "=" + partitions.get(i))
       }
 
-      spark.sql("set hoodie.bootstrap.parallelism = 20")
-      spark.sql("set hoodie.metadata.index.column.stats.enable = false")
-      checkAnswer(
-        s"""call run_bootstrap(
-           |table => '$tableName',
-           |base_path => '$tablePath',
-           |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
-           |bootstrap_path => '$sourcePath',
-           |rowKey_field => '$RECORD_KEY_FIELD',
-           |partition_path_field => '$PARTITION_FIELD',
-           |options => 'hoodie.datasource.write.hive_style_partitioning=true',
-           |bootstrap_overwrite => true)""".stripMargin) {
-        Seq(0)
+      withSQLConf(
+        "hoodie.bootstrap.parallelism" -> "20",
+        "hoodie.metadata.index.column.stats.enable" -> "false") {
+        checkAnswer(
+          s"""call run_bootstrap(
+             |table => '$tableName',
+             |base_path => '$tablePath',
+             |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+             |bootstrap_path => '$sourcePath',
+             |rowKey_field => '$RECORD_KEY_FIELD',
+             |partition_path_field => '$PARTITION_FIELD',
+             |options => 
'hoodie.datasource.write.hive_style_partitioning=true',
+             |bootstrap_overwrite => true)""".stripMargin) {
+          Seq(0)
+        }
       }
 
       // create table
@@ -162,7 +165,6 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
       assertResult("true") {
         
metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
       };
-      spark.sessionState.conf.unsetConf("unset 
hoodie.metadata.index.column.stats.enable")
     }
   }
 
@@ -184,19 +186,21 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
       val df: Dataset[Row] = 
TestBootstrap.generateTestRawTripDataset(timestamp, 0, NUM_OF_RECORDS, null, 
jsc, spark.sqlContext)
       df.write.parquet(sourcePath)
 
-      spark.sql("set hoodie.bootstrap.parallelism = 20")
-      spark.sql("set hoodie.metadata.index.column.stats.enable = false")
-      // run bootstrap
-      checkAnswer(
-        s"""call run_bootstrap(
-           |table => '$tableName',
-           |base_path => '$tablePath',
-           |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
-           |bootstrap_path => '$sourcePath',
-           |rowKey_field => '$RECORD_KEY_FIELD',
-           |key_generator_class => 'NON_PARTITION',
-           |bootstrap_overwrite => true)""".stripMargin) {
-        Seq(0)
+      withSQLConf(
+        "hoodie.bootstrap.parallelism" -> "20",
+        "hoodie.metadata.index.column.stats.enable" -> "false") {
+        // run bootstrap
+        checkAnswer(
+          s"""call run_bootstrap(
+             |table => '$tableName',
+             |base_path => '$tablePath',
+             |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+             |bootstrap_path => '$sourcePath',
+             |rowKey_field => '$RECORD_KEY_FIELD',
+             |key_generator_class => 'NON_PARTITION',
+             |bootstrap_overwrite => true)""".stripMargin) {
+          Seq(0)
+        }
       }
 
       // create table
@@ -216,10 +220,10 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
       // cluster with row writer disabled and assert that records match with 
that before clustering
       // NOTE: the row writer path is already tested in 
TestDataSourceForBootstrap
       val beforeClusterDf = spark.sql(s"select * from $tableName")
-      spark.sql("set hoodie.datasource.write.row.writer.enable = false")
-      spark.sql(s"""call run_clustering(table => '$tableName')""".stripMargin)
+      withSQLConf("hoodie.datasource.write.row.writer.enable" -> "false") {
+        spark.sql(s"""call run_clustering(table => 
'$tableName')""".stripMargin)
+      }
       assertResult(0)(spark.sql(s"select * from 
$tableName").except(beforeClusterDf).count())
-      spark.sessionState.conf.unsetConf("unset 
hoodie.metadata.index.column.stats.enable")
     }
   }
 
@@ -245,23 +249,23 @@ class TestBootstrapProcedure extends 
HoodieSparkProcedureTestBase {
         df.write.parquet(sourcePath + StoragePath.SEPARATOR + PARTITION_FIELD 
+ "=" + partitions.get(i))
       }
 
-      spark.sql("set hoodie.bootstrap.parallelism = 20")
-      spark.sql("set hoodie.table.ordering.fields=timestamp")
-      spark.sql("set hoodie.metadata.index.column.stats.enable = false")
-
-      checkAnswer(
-        s"""call run_bootstrap(
-           |table => '$tableName',
-           |base_path => '$tablePath',
-           |table_type => '${HoodieTableType.MERGE_ON_READ.name}',
-           |bootstrap_path => '$sourcePath',
-           |rowKey_field => '$RECORD_KEY_FIELD',
-           |selector_class => 
'org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector',
-           |partition_path_field => '$PARTITION_FIELD',
-           |bootstrap_overwrite => true)""".stripMargin) {
-        Seq(0)
+      withSQLConf(
+        "hoodie.bootstrap.parallelism" -> "20",
+        "hoodie.table.ordering.fields" -> "timestamp",
+        "hoodie.metadata.index.column.stats.enable" -> "false") {
+        checkAnswer(
+          s"""call run_bootstrap(
+             |table => '$tableName',
+             |base_path => '$tablePath',
+             |table_type => '${HoodieTableType.MERGE_ON_READ.name}',
+             |bootstrap_path => '$sourcePath',
+             |rowKey_field => '$RECORD_KEY_FIELD',
+             |selector_class => 
'org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector',
+             |partition_path_field => '$PARTITION_FIELD',
+             |bootstrap_overwrite => true)""".stripMargin) {
+          Seq(0)
+        }
       }
-      spark.sessionState.conf.unsetConf("unset 
hoodie.metadata.index.column.stats.enable")
     }
   }
 }

Reply via email to