This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 641cc72f2f [Improve][CDC] Filter ddl for snapshot phase (#8911) 641cc72f2f is described below commit 641cc72f2fdb7d35b818c08c0bc252e7f1f2af7f Author: hailin0 <wanghai...@apache.org> AuthorDate: Mon Mar 10 14:10:23 2025 +0800 [Improve][CDC] Filter ddl for snapshot phase (#8911) --- .../cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java | 2 ++ .../cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java index 6fd98e5358..a9f0339e12 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask; @@ -152,6 +153,7 @@ public class MySqlSnapshotFetchTask implements FetchTask<SourceSplitBase> { context.getSourceConfig() .getDbzConfiguration() .edit() + .with(MySqlSourceConfigFactory.SCHEMA_CHANGE_KEY, "false") .with("table.include.list", split.getTableId().toString()) // Disable heartbeat event in snapshot split fetcher .with(Heartbeat.HEARTBEAT_INTERVAL, 0) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java index 8b6131eeb6..fa0e0dee5e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/scan/OracleSnapshotFetchTask.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind; +import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.OracleSourceFetchTaskContext; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.reader.fetch.logminer.OracleRedoLogFetchTask; @@ -159,6 +160,7 @@ public class OracleSnapshotFetchTask implements FetchTask<SourceSplitBase> { context.getSourceConfig() .getDbzConfiguration() .edit() + .with(OracleSourceConfigFactory.SCHEMA_CHANGE_KEY, "false") .with( "table.include.list", split.getTableId()