This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f5cfec43270a98d9a10512e9a651fb35d4e63e76 Author: Yann Byron <[email protected]> AuthorDate: Sun Feb 5 08:05:13 2023 +0800 [HUDI-5701] Remove meta fields from cdc new record in CDCLogger (#7852) --- .../java/org/apache/hudi/io/HoodieCDCLogger.java | 2 +- .../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index fd2dc60b58b..096bf475667 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -245,7 +245,7 @@ public class HoodieCDCLogger implements Closeable { private CDCTransformer getTransformer() { if (cdcSupplementalLoggingMode == data_before_after) { return (operation, recordKey, oldRecord, newRecord) -> - HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord); + HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), removeCommitMetadata(newRecord)); } else if (cdcSupplementalLoggingMode == data_before) { return (operation, recordKey, oldRecord, newRecord) -> HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord)); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala index bec2230e5ab..a27da08866a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only} +import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, data_before_after, op_key_only} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Assertions.assertEquals @@ -53,10 +53,15 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"use $databaseName") Seq("cow", "mor").foreach { tableType => - Seq(op_key_only, data_before).foreach { loggingMode => + Seq(op_key_only, data_before, data_before_after).foreach { loggingMode => withTempDir { tmp => val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" + val otherTableProperties = if (tableType == "mor") { + "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2'," + } else { + "" + } spark.sql( s""" | create table $tableName ( @@ -70,6 +75,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { | 'preCombineField' = 'ts', | 'hoodie.table.cdc.enabled' = 'true', | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', + | $otherTableProperties | type = '$tableType' | ) | location '$basePath' @@ -88,7 +94,9 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)") val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp - val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1) + // here we use `commitTime1` to query the change data in commit 2. + // because `commitTime2` is maybe the ts of the compaction operation, not the write operation. + val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) cdcDataOnly2.show(false) assertCDCOpCnt(cdcDataOnly2, 0, 1, 0) @@ -110,13 +118,13 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2") val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp - val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1) + val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong) cdcDataOnly3.show(false) assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) spark.sql(s"delete from $tableName where id = 3") val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp - val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1) + val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong) cdcDataOnly4.show(false) assertCDCOpCnt(cdcDataOnly4, 0, 0, 1) @@ -135,7 +143,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { | when not matched then insert * """.stripMargin) val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp - val cdcDataOnly5 = cdcDataFrame(basePath, commitTime5.toLong - 1) + val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong) cdcDataOnly5.show(false) assertCDCOpCnt(cdcDataOnly5, 1, 1, 0)
