This is an automated email from the ASF dual-hosted git repository.

dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d783f9447c [Feature][CDC] Add 'schema-changes.enabled' options (#8252)
d783f9447c is described below

commit d783f9447c4f541f01b3487496dbb844451964ae
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Tue Dec 10 15:48:39 2024 +0800

    [Feature][CDC] Add 'schema-changes.enabled' options (#8252)
---
 docs/en/concept/schema-evolution.md                | 35 +++++++++++----------
 .../formats/cdc-compatible-debezium-json.md        |  2 --
 docs/en/connector-v2/sink/Paimon.md                |  5 ++-
 docs/en/connector-v2/source/MySQL-CDC.md           |  8 ++---
 docs/en/connector-v2/source/Oracle-CDC.md          |  1 +
 docs/zh/concept/schema-evolution.md                | 36 ++++++++++++----------
 .../formats/cdc-compatible-debezium-json.md        |  2 --
 docs/zh/connector-v2/sink/Paimon.md                |  5 ++-
 .../cdc/base/config/JdbcSourceConfigFactory.java   | 10 ++++--
 .../connectors/cdc/base/option/SourceOptions.java  |  7 +++++
 .../cdc/mysql/config/MySqlSourceConfigFactory.java |  6 ++--
 .../source/MySqlIncrementalSourceFactory.java      | 31 +++++++++++++------
 .../oracle/config/OracleSourceConfigFactory.java   | 24 ++++++++++-----
 .../cdc/oracle/source/OracleIncrementalSource.java | 12 +-------
 .../source/OracleIncrementalSourceFactory.java     | 31 +++++++++++++------
 .../mysqlcdc_to_mysql_with_schema_change.conf      |  5 ++-
 ...c_to_mysql_with_schema_change_exactly_once.conf |  5 ++-
 .../oraclecdc_to_mysql_with_schema_change.conf     |  5 +--
 .../oraclecdc_to_oracle_with_schema_change.conf    |  5 +--
 ..._to_oracle_with_schema_change_exactly_once.conf |  4 ++-
 .../mysql_cdc_to_iceberg_for_schema_change.conf    | 10 ++----
 .../mysql_cdc_to_paimon_with_schema_change.conf    |  5 ++-
 .../mysqlcdc_to_starrocks_with_schema_change.conf  |  5 ++-
 23 files changed, 144 insertions(+), 115 deletions(-)

diff --git a/docs/en/concept/schema-evolution.md 
b/docs/en/concept/schema-evolution.md
index b1db057387..5de26ea94e 100644
--- a/docs/en/concept/schema-evolution.md
+++ b/docs/en/concept/schema-evolution.md
@@ -1,7 +1,16 @@
 # Schema evolution
 Schema Evolution means that the schema of a data table can be changed and the 
data synchronization task can automatically adapt to the changes of the new 
table structure without any other operations.
-Now we only support the operation about `add column`、`drop column`、`rename 
column` and `modify column` of the table in CDC source. This feature is only 
support zeta engine at now. 
 
+## Supported engines
+
+- Zeta
+
+## Supported schema change event types
+
+- `ADD COLUMN`
+- `DROP COLUMN`
+- `RENAME COLUMN`
+- `MODIFY COLUMN`
 
 ## Supported connectors
 
@@ -21,7 +30,7 @@ When you use the Oracle-CDC,you can not use the username 
named `SYS` or `SYSTE
 Otherwise, If your table name start with `ORA_TEMP_` will also has the same 
problem.
 
 ## Enable schema evolution
-Schema evolution is disabled by default in CDC source. You need configure 
`debezium.include.schema.changes = true` which is only supported in CDC to 
enable it. When you use Oracle-CDC with schema-evolution enabled, you must 
specify `redo_log_catalog` as `log.mining.strategy` in the `debezium` attribute.
+Schema evolution is disabled by default in CDC source. You need configure 
`schema-changes.enabled = true` which is only supported in CDC to enable it.
 
 ## Examples
 
@@ -43,9 +52,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
@@ -86,10 +94,8 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
-    debezium {
-        include.schema.changes = true
-        log.mining.strategy = redo_log_catalog
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
@@ -131,10 +137,8 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
-    debezium {
-        include.schema.changes = true
-        log.mining.strategy = redo_log_catalog
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
@@ -169,9 +173,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md 
b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
index 564eb2356c..59f9981d71 100644
--- a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -33,8 +33,6 @@ source {
         # include schema into kafka message
         key.converter.schemas.enable = false
         value.converter.schemas.enable = false
-        # include ddl
-        include.schema.changes = true
         # topic prefix
         database.server.name =  "mysql_cdc_1"
     }
diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index 2959855120..f2a68ae3b8 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -107,9 +107,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index cc58ec4459..0114d5c1d5 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -196,7 +196,8 @@ When an initial consistent snapshot is made for large 
databases, your establishe
 | inverse-sampling.rate                          | Integer  | No       | 1000  
  | The inverse of the sampling rate used in the sample sharding strategy. For 
example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is preferr [...]
 | exactly_once                                   | Boolean  | No       | false 
  | Enable exactly once semantic.                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | format                                         | Enum     | No       | 
DEFAULT | Optional output format for MySQL CDC, valid enumerations are 
`DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`.                                           
                                                                                
                                                                                
                                                                                
                             [...]
-| debezium                                       | Config   | No       | -     
  | Pass-through [Debezium's 
properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties)
 to Debezium Embedded Engine which is used to capture data changes from MySQL 
server.  Schema evolution is disabled by default.  You need configure 
`debezium.include.schema.changes = true` to enable it. Now we only support `add 
column`、`drop [...]
+| schema-changes.enabled                         | Boolean  | No       | false 
  | Schema evolution is disabled by default. Now we only support `add 
column`、`drop column`、`rename column` and `modify column`.                      
                                                                                
                                                                                
                                                                                
                        [...]
+| debezium                                       | Config   | No       | -     
  | Pass-through [Debezium's 
properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/mysql.adoc#connector-properties)
 to Debezium Embedded Engine which is used to capture data changes from MySQL 
server.                                                                         
                                                                                
    [...]
 | common-options                                 |          | no       | -     
  | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                                                                
                          [...]
 
 ## Task Example
@@ -281,9 +282,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
diff --git a/docs/en/connector-v2/source/Oracle-CDC.md 
b/docs/en/connector-v2/source/Oracle-CDC.md
index 8e5c332bef..28aeef2a50 100644
--- a/docs/en/connector-v2/source/Oracle-CDC.md
+++ b/docs/en/connector-v2/source/Oracle-CDC.md
@@ -249,6 +249,7 @@ exit;
 | use_select_count                               | Boolean  | No       | false 
  | Use select count for table count rather then other methods in full stage.In 
this scenario, select count directly is used when it is faster to update 
statistics using sql from analysis table                                        
                                                                                
                                                                                
                     [...]
 | skip_analyze                                   | Boolean  | No       | false 
  | Skip the analysis of table count in full stage.In this scenario, you 
schedule analysis table sql to update related table statistics periodically or 
your table data does not change frequently                                      
                                                                                
                                                                                
                      [...]
 | format                                         | Enum     | No       | 
DEFAULT | Optional output format for Oracle CDC, valid enumerations are 
`DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`.                                           
                                                                                
                                                                                
                                                                                
                            [...]
+| schema-changes.enabled                         | Boolean  | No       | false 
  | Schema evolution is disabled by default. Now we only support `add 
column`、`drop column`、`rename column` and `modify column`.                      
                                                                                
                                                                                
                                                                                
                        [...]
 | debezium                                       | Config   | No       | -     
  | Pass-through [Debezium's 
properties](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties)
 to Debezium Embedded Engine which is used to capture data changes from Oracle 
server.                                                                         
                                                                                
  [...]
 | common-options                                 |          | no       | -     
  | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                                                                
                          [...]
 
diff --git a/docs/zh/concept/schema-evolution.md 
b/docs/zh/concept/schema-evolution.md
index f8770abed5..200259f518 100644
--- a/docs/zh/concept/schema-evolution.md
+++ b/docs/zh/concept/schema-evolution.md
@@ -1,6 +1,16 @@
 # 模式演进
 模式演进是指数据表的Schema可以改变,数据同步任务可以自动适应新的表结构的变化而无需其他操作。
-现在我们只支持对CDC源中的表进行“添加列”、“删除列”、“重命名列”和“修改列”的操作。目前这个功能只支持zeta引擎。
+
+## 已支持的引擎
+
+- Zeta
+
+## 已支持的模式变更事件类型
+
+- `ADD COLUMN`
+- `DROP COLUMN`
+- `RENAME COLUMN`
+- `MODIFY COLUMN`
 
 ## 已支持的连接器
 
@@ -20,7 +30,7 @@
 另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。
 
 ## 启用Schema evolution功能
-在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes = 
true`来启用它。当你使用Oracle-CDC并且启用schema-evolution时,你必须将`debezium`属性中的`log.mining.strategy`指定为`redo_log_catalog`。
+在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。
 
 ## 示例
 
@@ -42,9 +52,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
@@ -85,10 +94,8 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
-    debezium {
-        include.schema.changes = true
-        log.mining.strategy = redo_log_catalog
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
@@ -130,10 +137,8 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
-    debezium {
-        include.schema.changes = true
-        log.mining.strategy = redo_log_catalog
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
@@ -168,9 +173,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
diff --git a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md 
b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
index 8febab18fb..6c5b57b278 100644
--- a/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
+++ b/docs/zh/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -33,8 +33,6 @@ source {
         # include schema into kafka message
         key.converter.schemas.enable = false
         value.converter.schemas.enable = false
-        # include ddl
-        include.schema.changes = true
         # topic prefix
         database.server.name =  "mysql_cdc_1"
     }
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index 4d83dcb6c7..1faa5dc9b0 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -105,9 +105,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+    
+    schema-changes.enabled = true
   }
 }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index 99ddb3bd17..87dd7d3a8f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -44,7 +44,6 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
     protected List<String> tableList;
     protected StartupConfig startupConfig;
     protected StopConfig stopConfig;
-    protected boolean includeSchemaChanges = false;
     protected double distributionFactorUpper =
             
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
     protected double distributionFactorLower =
@@ -60,6 +59,10 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
     protected int connectMaxRetries = 
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
     protected int connectionPoolSize = 
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
     @Setter protected boolean exactlyOnce = 
JdbcSourceOptions.EXACTLY_ONCE.defaultValue();
+
+    @Setter
+    protected boolean schemaChangeEnabled = 
JdbcSourceOptions.SCHEMA_CHANGES_ENABLED.defaultValue();
+
     protected Properties dbzProperties;
 
     /** String hostname of the database server. */
@@ -210,8 +213,8 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
     }
 
     /** Whether the {@link SourceConfig} should output the schema changes or 
not. */
-    public JdbcSourceConfigFactory includeSchemaChanges(boolean 
includeSchemaChanges) {
-        this.includeSchemaChanges = includeSchemaChanges;
+    public JdbcSourceConfigFactory schemaChangeEnabled(boolean 
schemaChangeEnabled) {
+        this.schemaChangeEnabled = schemaChangeEnabled;
         return this;
     }
 
@@ -264,6 +267,7 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         this.connectMaxRetries = 
config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
         this.connectionPoolSize = 
config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
         this.exactlyOnce = config.get(JdbcSourceOptions.EXACTLY_ONCE);
+        this.schemaChangeEnabled = 
config.get(JdbcSourceOptions.SCHEMA_CHANGES_ENABLED);
         this.dbzProperties = new Properties();
         config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
                 .ifPresent(map -> dbzProperties.putAll(map));
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 6c83088ef2..7fcd4d3448 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -107,6 +107,13 @@ public class SourceOptions {
                     .defaultValue(false)
                     .withDescription("Enable exactly once semantic.");
 
+    public static final Option<Boolean> SCHEMA_CHANGES_ENABLED =
+            Options.key("schema-changes.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Enable send schema change events, by default is 
false. If set to true, the schema changes will be sent to downstream.");
+
     public static OptionRule.Builder getBaseRule() {
         return OptionRule.builder()
                 .optional(FORMAT)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index fd5d7deadf..db63e4e4dc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -31,7 +31,6 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
 /** A factory to initialize {@link MySqlSourceConfig}. */
 public class MySqlSourceConfigFactory extends JdbcSourceConfigFactory {
     public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
-    public static final Boolean SCHEMA_CHANGE_DEFAULT = true;
 
     private ServerIdRange serverIdRange;
 
@@ -78,9 +77,8 @@ public class MySqlSourceConfigFactory extends 
JdbcSourceConfigFactory {
         // Note: the includeSchemaChanges parameter is used to control 
emitting the schema record,
         // only DataStream API program need to emit the schema record, the 
Table API need not
 
-        // Some scenarios do not require automatic capture of table structure 
changes, so the
-        // default setting is true.
-        props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString());
+        // setting debezium capture mysql ddl
+        props.setProperty(SCHEMA_CHANGE_KEY, 
String.valueOf(schemaChangeEnabled));
         // disable the offset flush totally
         props.setProperty("offset.flush.interval.ms", 
String.valueOf(Long.MAX_VALUE));
         // disable tombstones
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 8de399b587..c11f9e72d4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -69,7 +69,8 @@ public class MySqlIncrementalSourceFactory extends 
BaseChangeStreamTableSourceFa
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
                         JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
                         JdbcSourceOptions.INVERSE_SAMPLING_RATE,
-                        JdbcSourceOptions.TABLE_NAMES_CONFIG)
+                        JdbcSourceOptions.TABLE_NAMES_CONFIG,
+                        JdbcSourceOptions.SCHEMA_CHANGES_ENABLED)
                 .optional(MySqlSourceOptions.STARTUP_MODE, 
MySqlSourceOptions.STOP_MODE)
                 .conditional(
                         MySqlSourceOptions.STARTUP_MODE,
@@ -103,15 +104,25 @@ public class MySqlIncrementalSourceFactory extends 
BaseChangeStreamTableSourceFa
                             context.getOptions(), context.getClassLoader());
             boolean enableSchemaChange =
                     context.getOptions()
-                            .getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
-                            .map(
-                                    e ->
-                                            e.getOrDefault(
-                                                    
MySqlSourceConfigFactory.SCHEMA_CHANGE_KEY,
-                                                    
MySqlSourceConfigFactory.SCHEMA_CHANGE_DEFAULT
-                                                            .toString()))
-                            .map(Boolean::parseBoolean)
-                            
.orElse(MySqlSourceConfigFactory.SCHEMA_CHANGE_DEFAULT);
+                            .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED)
+                            .orElse(
+                                    // TODO remove this after all users used 
the new schema change
+                                    // option
+                                    context.getOptions()
+                                            
.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
+                                            .map(
+                                                    e ->
+                                                            e.getOrDefault(
+                                                                    
MySqlSourceConfigFactory
+                                                                            
.SCHEMA_CHANGE_KEY,
+                                                                    
SourceOptions
+                                                                            
.SCHEMA_CHANGES_ENABLED
+                                                                            
.defaultValue()
+                                                                            
.toString()))
+                                            .map(Boolean::parseBoolean)
+                                            .orElse(
+                                                    
SourceOptions.SCHEMA_CHANGES_ENABLED
+                                                            .defaultValue()));
             if (!restoreTables.isEmpty() && enableSchemaChange) {
                 catalogTables = mergeTableStruct(catalogTables, restoreTables);
             }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
index b08d4e4dad..3786fb937c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/config/OracleSourceConfigFactory.java
@@ -37,7 +37,8 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
     private static final String DRIVER_CLASS_NAME = 
"oracle.jdbc.driver.OracleDriver";
     public static final String SCHEMA_CHANGE_KEY = "include.schema.changes";
-    public static final Boolean SCHEMA_CHANGE_DEFAULT = true;
+    public static final String LOG_MINING_STRATEGY_KEY = "log.mining.strategy";
+    public static final String LOG_MINING_STRATEGY_DEFAULT = "online_catalog";
 
     private List<String> schemaList;
 
@@ -94,17 +95,16 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
         props.setProperty("database.history.skip.unparseable.ddl", 
String.valueOf(true));
         props.setProperty("database.history.refer.ddl", String.valueOf(true));
 
-        // Some scenarios do not require automatic capture of table structure 
changes, so the
-        // default setting is true.
-        props.setProperty(SCHEMA_CHANGE_KEY, SCHEMA_CHANGE_DEFAULT.toString());
+        // setting debezium capture oracle ddl
+        props.setProperty(SCHEMA_CHANGE_KEY, 
String.valueOf(schemaChangeEnabled));
+        props.setProperty(
+                LOG_MINING_STRATEGY_KEY,
+                schemaChangeEnabled ? "redo_log_catalog" : 
LOG_MINING_STRATEGY_DEFAULT);
 
         props.setProperty("connect.timeout.ms", 
String.valueOf(connectTimeoutMillis));
         // disable tombstones
         props.setProperty("tombstones.on.delete", String.valueOf(false));
 
-        // Optimize logminer latency
-        props.setProperty("log.mining.strategy", "online_catalog");
-
         if (originUrl != null) {
             props.setProperty("database.url", originUrl);
         } else {
@@ -139,6 +139,16 @@ public class OracleSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
         // override the user-defined debezium properties
         if (dbzProperties != null) {
+            String debeziumSchemaChanges =
+                    dbzProperties.getProperty(
+                            SCHEMA_CHANGE_KEY, 
String.valueOf(schemaChangeEnabled));
+            String debeziumLogMiningStrategy =
+                    dbzProperties.getProperty(LOG_MINING_STRATEGY_KEY, 
LOG_MINING_STRATEGY_DEFAULT);
+            if (Boolean.parseBoolean(debeziumSchemaChanges)
+                    && 
LOG_MINING_STRATEGY_DEFAULT.equals(debeziumLogMiningStrategy)) {
+                throw new IllegalArgumentException(
+                        "Debezium log mining strategy must be set to 
redo_log_catalog when schema changes are enabled");
+            }
             props.putAll(dbzProperties);
         }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
index 80b4a0b3c0..eb602aa418 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java
@@ -105,22 +105,12 @@ public class OracleIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceC
         }
 
         String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
-
-        boolean enableDDL =
-                Boolean.parseBoolean(
-                        debeziumProperties.getOrDefault(
-                                OracleSourceConfigFactory.SCHEMA_CHANGE_KEY,
-                                
OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT.toString()));
-
         return (DebeziumDeserializationSchema<T>)
                 SeaTunnelRowDebeziumDeserializeSchema.builder()
                         .setTables(catalogTables)
                         .setServerTimeZone(ZoneId.of(zoneId))
                         .setSchemaChangeResolver(
-                                enableDDL
-                                        ? new OracleSchemaChangeResolver(
-                                                
createSourceConfigFactory(config))
-                                        : null)
+                                new 
OracleSchemaChangeResolver(createSourceConfigFactory(config)))
                         .setTableIdTableChangeMap(tableIdStructMap)
                         .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
index d790107cf1..01690bc3dc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java
@@ -69,7 +69,8 @@ public class OracleIncrementalSourceFactory extends 
BaseChangeStreamTableSourceF
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
                         JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
-                        JdbcSourceOptions.TABLE_NAMES_CONFIG)
+                        JdbcSourceOptions.TABLE_NAMES_CONFIG,
+                        JdbcSourceOptions.SCHEMA_CHANGES_ENABLED)
                 .optional(OracleSourceOptions.STARTUP_MODE, 
OracleSourceOptions.STOP_MODE)
                 .conditional(
                         OracleSourceOptions.STARTUP_MODE,
@@ -109,15 +110,25 @@ public class OracleIncrementalSourceFactory extends 
BaseChangeStreamTableSourceF
                             context.getOptions(), context.getClassLoader());
             boolean enableSchemaChange =
                     context.getOptions()
-                            .getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
-                            .map(
-                                    e ->
-                                            e.getOrDefault(
-                                                    
OracleSourceConfigFactory.SCHEMA_CHANGE_KEY,
-                                                    
OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT
-                                                            .toString()))
-                            .map(Boolean::parseBoolean)
-                            
.orElse(OracleSourceConfigFactory.SCHEMA_CHANGE_DEFAULT);
+                            .getOptional(SourceOptions.SCHEMA_CHANGES_ENABLED)
+                            .orElse(
+                                    // TODO remove this after all users used 
the new schema change
+                                    // option
+                                    context.getOptions()
+                                            
.getOptional(SourceOptions.DEBEZIUM_PROPERTIES)
+                                            .map(
+                                                    e ->
+                                                            e.getOrDefault(
+                                                                    
OracleSourceConfigFactory
+                                                                            
.SCHEMA_CHANGE_KEY,
+                                                                    
SourceOptions
+                                                                            
.SCHEMA_CHANGES_ENABLED
+                                                                            
.defaultValue()
+                                                                            
.toString()))
+                                            .map(Boolean::parseBoolean)
+                                            .orElse(
+                                                    
SourceOptions.SCHEMA_CHANGES_ENABLED
+                                                            .defaultValue()));
             if (!restoreTables.isEmpty() && enableSchemaChange) {
                 catalogTables = mergeTableStruct(catalogTables, restoreTables);
             }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
index 632b643bb2..7e93474d5e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
@@ -34,9 +34,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+
+    schema-changes.enabled = true
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
index 8aa06c85bd..275ecf4464 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change_exactly_once.conf
@@ -34,9 +34,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+
+    schema-changes.enabled = true
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
index 58acb86f83..70c9aedb4f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_mysql_with_schema_change.conf
@@ -37,10 +37,11 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
+
+    schema-changes.enabled = true
     debezium {
         database.oracle.jdbc.timezoneAsRegion = false
-        include.schema.changes = true
-        log.mining.strategy = redo_log_catalog
+
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
index 80fcc8c796..76903a6e00 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change.conf
@@ -37,10 +37,11 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
+
+    schema-changes.enabled = true
     debezium {
         database.oracle.jdbc.timezoneAsRegion = false
-        include.schema.changes = true
-        log.mining.strategy = redo_log_catalog
+
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
index 949e62ef71..9554a4fd49 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_with_schema_change_exactly_once.conf
@@ -37,9 +37,11 @@ source {
     base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
     source.reader.close.timeout = 120000
     connection.pool.size = 1
+
+    schema-changes.enabled = true
     debezium {
         database.oracle.jdbc.timezoneAsRegion = false
-        include.schema.changes = true
+
     }
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
index 68102192a0..c8353c9e4c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
@@ -27,13 +27,9 @@ env {
 source {
   MySQL-CDC {
     plugin_output="customer_result_table"
-    catalog {
-      factory = Mysql
-    }
-    debezium = {
-      # include ddl
-      "include.schema.changes" = true
-    }
+
+    schema-changes.enabled = true
+
     database-names=["mysql_cdc"]
     table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
     format=DEFAULT
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
index 714c4be81c..a214430dd0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
@@ -34,9 +34,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+
+    schema-changes.enabled = true
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
index ba3c03db1e..76d86a4e8c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/mysqlcdc_to_starrocks_with_schema_change.conf
@@ -32,9 +32,8 @@ source {
     password = "mysqlpw"
     table-names = ["shop.products"]
     base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
-    debezium = {
-      include.schema.changes = true
-    }
+
+    schema-changes.enabled = true
   }
 }
 


Reply via email to