This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 784ea417aec0 feat: introduce pk filter push-down to base file (#14183)
784ea417aec0 is described below
commit 784ea417aec0bcd79c7da9a98f73f9417abe9194
Author: TheR1sing3un <[email protected]>
AuthorDate: Tue Nov 4 09:46:50 2025 +0800
feat: introduce pk filter push-down to base file (#14183)
* feat: introduce pk filter push-down to base file
---------
Signed-off-by: TheR1sing3un <[email protected]>
---
.../SparkFileFormatInternalRowReaderContext.scala | 16 ++-
...stSparkFileFormatInternalRowReaderContext.scala | 28 ++++-
.../sql/hudi/dml/insert/TestInsertTable.scala | 99 ++++++++++++++++
.../hudi/procedure/TestBootstrapProcedure.scala | 130 +++++++++++----------
4 files changed, 207 insertions(+), 66 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 537189909dbe..950f5d1daf94 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -21,11 +21,11 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
-import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
getAppliedRequiredSchema}
+import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
filterIsSafeForPrimaryKey, getAppliedRequiredSchema}
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -62,7 +62,9 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader:
SparkColumnarFileR
tableConfig: HoodieTableConfig)
extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig,
SparkFileFormatInternalRecordContext.apply(tableConfig)) {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
+ private lazy val recordKeyFields =
Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty)
private lazy val bootstrapSafeFilters: Seq[Filter] =
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
+ private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_,
recordKeyFields)) ++ requiredFilters
private lazy val allFilters = filters ++ requiredFilters
override def getFileRecordIterator(filePath: StoragePath,
@@ -77,6 +79,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader:
SparkColumnarFileR
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
+ // TODO: introduce pk filter in log file reader
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
} else {
@@ -97,6 +100,8 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
(schemaForRead, allFilters)
} else if (!getHasLogFiles && hasRowIndexField) {
(schemaForRead, bootstrapSafeFilters)
+ } else if (!getNeedsBootstrapMerge) {
+ (schemaForRead, morFilters)
} else {
(schemaForRead, requiredFilters)
}
@@ -262,6 +267,13 @@ object SparkFileFormatInternalRowReaderContext {
metaRefCount == filter.references.length || metaRefCount == 0
}
+ /**
+ * Only valid if the filter's references only include primary key columns or
{@link HoodieRecord.RECORD_KEY_METADATA_FIELD}
+ */
+ def filterIsSafeForPrimaryKey(filter: Filter, recordKeyFields: Set[String]):
Boolean = {
+ filter.references.forall(c => recordKeyFields.contains(c.toLowerCase) ||
c.equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD))
+ }
+
private def isIndexTempColumn(field: StructField): Boolean = {
field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
index 738fc1cad6c9..6aaec1ca7427 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -20,7 +20,7 @@
package org.apache.spark.execution.datasources.parquet
import org.apache.hudi.SparkFileFormatInternalRowReaderContext
-import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
+import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
filterIsSafeForPrimaryKey}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
@@ -63,6 +63,32 @@ class TestSparkFileFormatInternalRowReaderContext extends
SparkClientFunctionalT
assertTrue(filterIsSafeForBootstrap(legalNestedFilter))
}
+ @Test
+ def testPKFilter(): Unit = {
+ val recordKeyField =
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName
+ val pk1 = "pk1"
+ val pk2 = "pk2"
+ val dataField = "data_col"
+ val pkFieldsSet = Set(pk1, pk2)
+
+ // case1: only record key or pk fields
+ val recordKeyFilter = IsNotNull(recordKeyField)
+ assertTrue(filterIsSafeForPrimaryKey(recordKeyFilter, pkFieldsSet))
+ val pkFieldFilter = IsNotNull(pk1)
+ assertTrue(filterIsSafeForPrimaryKey(pkFieldFilter, pkFieldsSet))
+ val pkFieldsFilter = And(IsNotNull(pk1), IsNotNull(pk2))
+ assertTrue(filterIsSafeForPrimaryKey(pkFieldsFilter, pkFieldsSet))
+ val pkAndRecordKeyFilter = And(And(IsNotNull(recordKeyField),
IsNotNull(pk1)), IsNotNull(pk2))
+ assertTrue(filterIsSafeForPrimaryKey(pkAndRecordKeyFilter, pkFieldsSet))
+ // case2: with data field
+ val dataFieldFilter = IsNotNull(dataField)
+ assertFalse(filterIsSafeForPrimaryKey(dataFieldFilter, pkFieldsSet))
+ val illegalComplexFilter = Or(recordKeyFilter, dataFieldFilter)
+ assertFalse(filterIsSafeForPrimaryKey(illegalComplexFilter, pkFieldsSet))
+ val illegalNestedFilter = And(illegalComplexFilter, pkFieldFilter)
+ assertFalse(filterIsSafeForPrimaryKey(illegalNestedFilter, pkFieldsSet))
+ }
+
@Test
def testGetAppliedRequiredSchema(): Unit = {
val fields = Array(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index e258d0057a2c..e7ea760de95d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -20,12 +20,14 @@
package org.apache.spark.sql.hudi.dml.insert
import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.apache.spark.sql.internal.SQLConf
class TestInsertTable extends HoodieSparkSqlTestBase {
@@ -777,5 +779,102 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
})
}
+
+ test("Test Query With PK Filter") {
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ |) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id,name',
+ | preCombineField = 'ts',
+ | 'hoodie.index.type' = 'BUCKET',
+ | 'hoodie.bucket.index.num.buckets' = '1',
+ | '${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key()}' =
'parquet'
+ | )
+ | partitioned by (dt)
+ """.stripMargin
+ )
+ spark.conf.unset("hoodie.datasource.insert.dup.policy")
+
+ withSQLConf("hoodie.datasource.overwrite.mode" -> "dynamic") {
+ spark.sql(
+ s"""
+ | insert overwrite table $tableName partition(dt) values
+ | (0, 'a0', 10, 1000, '2023-12-06'),
+ | (1, 'a1', 10, 1000, '2023-12-06'),
+ | (2, 'a2', 11, 1000, '2023-12-06'),
+ | (3, 'a3', 10, 1000, '2023-12-06')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(0, "a0", 10.0, 1000, "2023-12-06"),
+ Seq(1, "a1", 10.0, 1000, "2023-12-06"),
+ Seq(2, "a2", 11.0, 1000, "2023-12-06"),
+ Seq(3, "a3", 10.0, 1000, "2023-12-06")
+ )
+ }
+ withSQLConf("hoodie.datasource.write.operation" -> "upsert") {
+ spark.sql(
+ s"""
+ | insert into table $tableName partition (dt='2023-12-06') values
+ | (1, 'a1', 11, 2000),
+ | (4, 'a4', 10, 1000)
+ """.stripMargin)
+ }
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(0, "a0", 10.0, 1000, "2023-12-06"),
+ Seq(1, "a1", 11.0, 2000, "2023-12-06"),
+ Seq(2, "a2", 11.0, 1000, "2023-12-06"),
+ Seq(3, "a3", 10.0, 1000, "2023-12-06"),
+ Seq(4, "a4", 10.0, 1000, "2023-12-06")
+ )
+
+ withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "true") {
+ checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or
name = 'a3') and price <> 10")(
+ Seq(11.0, 2000, "2023-12-06")
+ )
+ // Filter(id = 1) and Filter(name = 'a3') can be push down,
Filter(price <> 10) can't be push down since it's not primary key
+ var df = spark.sql(s"select price, ts, dt from $tableName where (id =
1 or name = 'a3') and price <> 10")
+ // only execute file scan physical plan
+ // expected in file scan only (id: 1), (id: 3) and (id: 4, from log
file) matched, (id: 3) and (id: 4, from log file) matched but will be filtered
later
+
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+
+ checkAnswer(s"select price, ts, dt from $tableName where (id > 1 or
name = 'a3') and price <> 10")(
+ Seq(11.0, 1000, "2023-12-06")
+ )
+ // Filter(id > 1) and Filter(name = 'a3') can be push down,
Filter(price <> 10) can't be push down since it's not primary key
+ df = spark.sql(s"select price, ts, dt from $tableName where (id > 1 or
name = 'a3') and price <> 10")
+ // expected in file scan only (id: 1, from log file) (id: 2), (id: 3)
and (id: 4, from log file) matched, (id: 1, from log file) (id: 3) and (id: 4)
matched but will be filtered later
+
assertResult(4)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+ }
+
+ withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "false") {
+ spark.sql(s"set ${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}=false")
+ checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or
name = 'a3') and price <> 10")(
+ Seq(11.0, 2000, "2023-12-06")
+ )
+ var df = spark.sql(s"select price, ts, dt from $tableName where (id =
1 or name = 'a3') and price <> 10")
+ // only execute file scan physical plan
+ // expected all ids in the table are scanned, and filtered later
+
assertResult(5)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+
+ checkAnswer(s"select price, ts, dt from $tableName where (id > 1 or
name = 'a3') and price <> 10")(
+ Seq(11.0, 1000, "2023-12-06")
+ )
+ df = spark.sql(s"select price, ts, dt from $tableName where (id > 1 or
name = 'a3') and price <> 10")
+
assertResult(5)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+ }
+
+ }
+
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
index bc69b434bf77..1eebeb9e5e91 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala
@@ -53,18 +53,20 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
df.write.parquet(sourcePath + StoragePath.SEPARATOR + PARTITION_FIELD
+ "=" + partitions.get(i))
}
- spark.sql("set hoodie.bootstrap.parallelism = 20")
- spark.sql("set hoodie.metadata.index.column.stats.enable = false")
- checkAnswer(
- s"""call run_bootstrap(
- |table => '$tableName',
- |base_path => '$tablePath',
- |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
- |bootstrap_path => '$sourcePath',
- |rowKey_field => '$RECORD_KEY_FIELD',
- |partition_path_field => '$PARTITION_FIELD',
- |bootstrap_overwrite => true)""".stripMargin) {
- Seq(0)
+ withSQLConf(
+ "hoodie.bootstrap.parallelism" -> "20",
+ "hoodie.metadata.index.column.stats.enable" -> "false") {
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |base_path => '$tablePath',
+ |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+ |bootstrap_path => '$sourcePath',
+ |rowKey_field => '$RECORD_KEY_FIELD',
+ |partition_path_field => '$PARTITION_FIELD',
+ |bootstrap_overwrite => true)""".stripMargin) {
+ Seq(0)
+ }
}
// create table
@@ -91,11 +93,10 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
// cluster with row writer disabled and assert that records match with
that before clustering
// NOTE: the row writer path is already tested in
TestDataSourceForBootstrap
val beforeClusterDf = spark.sql(s"select * from $tableName")
- spark.sql("set hoodie.datasource.write.row.writer.enable = false")
- spark.sql(s"""call run_clustering(table => '$tableName')""".stripMargin)
- assertResult(0)(spark.sql(s"select * from
$tableName").except(beforeClusterDf).count())
-
- spark.sessionState.conf.unsetConf("unset
hoodie.metadata.index.column.stats.enable") // HUDI-8774
+ withSQLConf("hoodie.datasource.write.row.writer.enable" -> "false") {
+ spark.sql(s"""call run_clustering(table =>
'$tableName')""".stripMargin)
+ assertResult(0)(spark.sql(s"select * from
$tableName").except(beforeClusterDf).count())
+ }
}
}
@@ -121,19 +122,21 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
df.write.parquet(sourcePath + StoragePath.SEPARATOR + PARTITION_FIELD
+ "=" + partitions.get(i))
}
- spark.sql("set hoodie.bootstrap.parallelism = 20")
- spark.sql("set hoodie.metadata.index.column.stats.enable = false")
- checkAnswer(
- s"""call run_bootstrap(
- |table => '$tableName',
- |base_path => '$tablePath',
- |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
- |bootstrap_path => '$sourcePath',
- |rowKey_field => '$RECORD_KEY_FIELD',
- |partition_path_field => '$PARTITION_FIELD',
- |options => 'hoodie.datasource.write.hive_style_partitioning=true',
- |bootstrap_overwrite => true)""".stripMargin) {
- Seq(0)
+ withSQLConf(
+ "hoodie.bootstrap.parallelism" -> "20",
+ "hoodie.metadata.index.column.stats.enable" -> "false") {
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |base_path => '$tablePath',
+ |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+ |bootstrap_path => '$sourcePath',
+ |rowKey_field => '$RECORD_KEY_FIELD',
+ |partition_path_field => '$PARTITION_FIELD',
+ |options =>
'hoodie.datasource.write.hive_style_partitioning=true',
+ |bootstrap_overwrite => true)""".stripMargin) {
+ Seq(0)
+ }
}
// create table
@@ -162,7 +165,6 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
assertResult("true") {
metaClient.getTableConfig.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE)
};
- spark.sessionState.conf.unsetConf("unset
hoodie.metadata.index.column.stats.enable")
}
}
@@ -184,19 +186,21 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
val df: Dataset[Row] =
TestBootstrap.generateTestRawTripDataset(timestamp, 0, NUM_OF_RECORDS, null,
jsc, spark.sqlContext)
df.write.parquet(sourcePath)
- spark.sql("set hoodie.bootstrap.parallelism = 20")
- spark.sql("set hoodie.metadata.index.column.stats.enable = false")
- // run bootstrap
- checkAnswer(
- s"""call run_bootstrap(
- |table => '$tableName',
- |base_path => '$tablePath',
- |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
- |bootstrap_path => '$sourcePath',
- |rowKey_field => '$RECORD_KEY_FIELD',
- |key_generator_class => 'NON_PARTITION',
- |bootstrap_overwrite => true)""".stripMargin) {
- Seq(0)
+ withSQLConf(
+ "hoodie.bootstrap.parallelism" -> "20",
+ "hoodie.metadata.index.column.stats.enable" -> "false") {
+ // run bootstrap
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |base_path => '$tablePath',
+ |table_type => '${HoodieTableType.COPY_ON_WRITE.name}',
+ |bootstrap_path => '$sourcePath',
+ |rowKey_field => '$RECORD_KEY_FIELD',
+ |key_generator_class => 'NON_PARTITION',
+ |bootstrap_overwrite => true)""".stripMargin) {
+ Seq(0)
+ }
}
// create table
@@ -216,10 +220,10 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
// cluster with row writer disabled and assert that records match with
that before clustering
// NOTE: the row writer path is already tested in
TestDataSourceForBootstrap
val beforeClusterDf = spark.sql(s"select * from $tableName")
- spark.sql("set hoodie.datasource.write.row.writer.enable = false")
- spark.sql(s"""call run_clustering(table => '$tableName')""".stripMargin)
+ withSQLConf("hoodie.datasource.write.row.writer.enable" -> "false") {
+ spark.sql(s"""call run_clustering(table =>
'$tableName')""".stripMargin)
+ }
assertResult(0)(spark.sql(s"select * from
$tableName").except(beforeClusterDf).count())
- spark.sessionState.conf.unsetConf("unset
hoodie.metadata.index.column.stats.enable")
}
}
@@ -245,23 +249,23 @@ class TestBootstrapProcedure extends
HoodieSparkProcedureTestBase {
df.write.parquet(sourcePath + StoragePath.SEPARATOR + PARTITION_FIELD
+ "=" + partitions.get(i))
}
- spark.sql("set hoodie.bootstrap.parallelism = 20")
- spark.sql("set hoodie.table.ordering.fields=timestamp")
- spark.sql("set hoodie.metadata.index.column.stats.enable = false")
-
- checkAnswer(
- s"""call run_bootstrap(
- |table => '$tableName',
- |base_path => '$tablePath',
- |table_type => '${HoodieTableType.MERGE_ON_READ.name}',
- |bootstrap_path => '$sourcePath',
- |rowKey_field => '$RECORD_KEY_FIELD',
- |selector_class =>
'org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector',
- |partition_path_field => '$PARTITION_FIELD',
- |bootstrap_overwrite => true)""".stripMargin) {
- Seq(0)
+ withSQLConf(
+ "hoodie.bootstrap.parallelism" -> "20",
+ "hoodie.table.ordering.fields" -> "timestamp",
+ "hoodie.metadata.index.column.stats.enable" -> "false") {
+ checkAnswer(
+ s"""call run_bootstrap(
+ |table => '$tableName',
+ |base_path => '$tablePath',
+ |table_type => '${HoodieTableType.MERGE_ON_READ.name}',
+ |bootstrap_path => '$sourcePath',
+ |rowKey_field => '$RECORD_KEY_FIELD',
+ |selector_class =>
'org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector',
+ |partition_path_field => '$PARTITION_FIELD',
+ |bootstrap_overwrite => true)""".stripMargin) {
+ Seq(0)
+ }
}
- spark.sessionState.conf.unsetConf("unset
hoodie.metadata.index.column.stats.enable")
}
}
}