yihua commented on code in PR #13694:
URL: https://github.com/apache/hudi/pull/13694#discussion_r2268310108
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala:
##########
@@ -304,4 +307,86 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Partial Updates With Spark CDC") {
+ val databaseName = "hudi_database"
+ spark.sql(s"create database if not exists $databaseName")
+ spark.sql(s"use $databaseName")
+ withSQLConf(HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key
->"0",
+ DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true",
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
Review Comment:
nit: setting `LOGFILE_DATA_BLOCK_FORMAT` is not necessary.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala:
##########
@@ -304,4 +307,86 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Partial Updates With Spark CDC") {
+ val databaseName = "hudi_database"
+ spark.sql(s"create database if not exists $databaseName")
+ spark.sql(s"use $databaseName")
+ withSQLConf(HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key
->"0",
+ DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> "true",
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> "true") {
+ Seq(OP_KEY_ONLY, DATA_BEFORE).foreach { loggingMode =>
Review Comment:
Should this also test `DATA_BEFORE_AFTER`?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala:
##########
@@ -304,4 +307,86 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Partial Updates With Spark CDC") {
+ val databaseName = "hudi_database"
+ spark.sql(s"create database if not exists $databaseName")
+ spark.sql(s"use $databaseName")
+ withSQLConf(HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key
->"0",
Review Comment:
```suggestion
withSQLConf(HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key ->
"0",
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -75,6 +76,27 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
extends Iterator[InternalRow]
with SparkAdapterSupport with AvroDeserializerSupport with Closeable {
+ private lazy val readerContext = {
+ val bufferedReaderContext = new
SparkFileFormatInternalRowReaderContext(baseFileReader,
+ Seq.empty, Seq.empty, conf, metaClient.getTableConfig)
+ bufferedReaderContext.initRecordMerger(readerProperties)
+ bufferedReaderContext
+ }
+
+ private lazy val orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode, props,
metaClient)
+ private lazy val payloadClass: Option[String] = if
(recordMerger.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID) {
+ Option.of(metaClient.getTableConfig.getPayloadClass)
+ } else {
+ Option.empty.asInstanceOf[Option[String]]
+ }
+ private lazy val partialUpdateMode: PartialUpdateMode =
metaClient.getTableConfig.getPartialUpdateMode
+ private var isPartialMergeEnabled = false
+ private var bufferedRecordMerger = getBufferedRecordMerger
+ private def getBufferedRecordMerger: BufferedRecordMerger[InternalRow] =
BufferedRecordMergerFactory.create(readerContext,
+ readerContext.getMergeMode, isPartialMergeEnabled,
Option.of(recordMerger), orderingFieldNames,
+ payloadClass, avroSchema, props, partialUpdateMode)
+
+
Review Comment:
nit: Redundant empty line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]