This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b922bb90e6 [Improve][CDC] Extract duplicate code (#8906)
b922bb90e6 is described below

commit b922bb90e65614a034cf5c2e5d2c5d180b76b6c2
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Thu Mar 6 16:42:47 2025 +0800

    [Improve][CDC] Extract duplicate code (#8906)
---
 .../external/JdbcSourceFetchTaskContext.java       | 53 ++++++++++++++++++++++
 .../reader/fetch/MySqlSourceFetchTaskContext.java  | 53 +---------------------
 .../reader/fetch/OracleSourceFetchTaskContext.java | 53 +---------------------
 .../reader/PostgresSourceFetchTaskContext.java     | 28 +-----------
 .../fetch/SqlServerSourceFetchTaskContext.java     | 29 +-----------
 5 files changed, 57 insertions(+), 159 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
index 48380d024f..e8d058f2a2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -22,26 +22,35 @@ import 
org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
 import 
org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer;
+import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 
+import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import io.debezium.config.CommonConnectorConfig;
 import io.debezium.data.Envelope;
+import io.debezium.jdbc.JdbcConnection;
 import io.debezium.pipeline.ErrorHandler;
 import io.debezium.pipeline.spi.OffsetContext;
 import io.debezium.pipeline.spi.Partition;
 import io.debezium.relational.RelationalDatabaseSchema;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.util.SchemaNameAdjuster;
 
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -155,6 +164,50 @@ public abstract class JdbcSourceFetchTaskContext 
implements FetchTask.Context {
                 .collect(Collectors.toList());
     }
 
+    protected void registerDatabaseHistory(
+            SourceSplitBase sourceSplitBase, JdbcConnection connection) {
+        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
+        // TODO: support save table schema
+        if (sourceSplitBase instanceof SnapshotSplit) {
+            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
+            engineHistory.add(
+                    dataSourceDialect.queryTableSchema(connection, 
snapshotSplit.getTableId()));
+        } else {
+            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
+            Map<TableId, byte[]> historyTableChanges = 
incrementalSplit.getHistoryTableChanges();
+            for (TableId tableId : incrementalSplit.getTableIds()) {
+                if (historyTableChanges != null && 
historyTableChanges.containsKey(tableId)) {
+                    SchemaAndValue schemaAndValue =
+                            jsonConverter.toConnectData("topic", 
historyTableChanges.get(tableId));
+                    Struct deserializedStruct = (Struct) 
schemaAndValue.value();
+
+                    TableChanges tableChanges =
+                            tableChangeSerializer.deserialize(
+                                    
Collections.singletonList(deserializedStruct), false);
+
+                    Iterator<TableChanges.TableChange> iterator = 
tableChanges.iterator();
+                    TableChanges.TableChange tableChange = null;
+                    while (iterator.hasNext()) {
+                        if (tableChange != null) {
+                            throw new IllegalStateException(
+                                    "The table changes should only have one 
element");
+                        }
+                        tableChange = iterator.next();
+                    }
+                    engineHistory.add(tableChange);
+                    continue;
+                }
+                
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
+            }
+        }
+
+        EmbeddedDatabaseHistory.registerHistory(
+                sourceConfig
+                        .getDbzConfiguration()
+                        
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
+                engineHistory);
+    }
+
     public SourceConfig getSourceConfig() {
         return sourceConfig;
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index 93e7cd1f2e..fcf25d50ea 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -24,16 +24,12 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
 import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
 
-import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -66,7 +62,6 @@ import 
io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
-import io.debezium.relational.history.TableChanges;
 import io.debezium.schema.DataCollectionId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.Collect;
@@ -75,9 +70,6 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -118,7 +110,7 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
 
     @Override
     public void configure(SourceSplitBase sourceSplitBase) {
-        registerDatabaseHistory(sourceSplitBase);
+        super.registerDatabaseHistory(sourceSplitBase, connection);
 
         // initial stateful objects
         final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -385,49 +377,6 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         schema.recover(Offsets.of(mySqlPartition, offset));
     }
 
-    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
-        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
-        // TODO: support save table schema
-        if (sourceSplitBase instanceof SnapshotSplit) {
-            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
-            engineHistory.add(
-                    dataSourceDialect.queryTableSchema(connection, 
snapshotSplit.getTableId()));
-        } else {
-            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
-            Map<TableId, byte[]> historyTableChanges = 
incrementalSplit.getHistoryTableChanges();
-            for (TableId tableId : incrementalSplit.getTableIds()) {
-                if (historyTableChanges != null && 
historyTableChanges.containsKey(tableId)) {
-                    SchemaAndValue schemaAndValue =
-                            jsonConverter.toConnectData("topic", 
historyTableChanges.get(tableId));
-                    Struct deserializedStruct = (Struct) 
schemaAndValue.value();
-
-                    TableChanges tableChanges =
-                            tableChangeSerializer.deserialize(
-                                    
Collections.singletonList(deserializedStruct), false);
-
-                    Iterator<TableChanges.TableChange> iterator = 
tableChanges.iterator();
-                    TableChanges.TableChange tableChange = null;
-                    while (iterator.hasNext()) {
-                        if (tableChange != null) {
-                            throw new IllegalStateException(
-                                    "The table changes should only have one 
element");
-                        }
-                        tableChange = iterator.next();
-                    }
-                    engineHistory.add(tableChange);
-                    continue;
-                }
-                
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
-            }
-        }
-
-        EmbeddedDatabaseHistory.registerHistory(
-                sourceConfig
-                        .getDbzConfiguration()
-                        
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-                engineHistory);
-    }
-
     /** A subclass implementation of {@link MySqlTaskContext} which reuses one 
BinaryLogClient. */
     public class MySqlTaskContextImpl extends MySqlTaskContext {
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index 942532a7f6..b7a5dc9c17 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -23,15 +23,11 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset;
 import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
 
-import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -66,11 +62,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.sql.SQLException;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
@@ -105,7 +97,7 @@ public class OracleSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
     @Override
     public void configure(SourceSplitBase sourceSplitBase) {
         // Initializes the table schema
-        registerDatabaseHistory(sourceSplitBase);
+        super.registerDatabaseHistory(sourceSplitBase, connection);
 
         // initial stateful objects
         final OracleConnectorConfig connectorConfig = getDbzConnectorConfig();
@@ -258,49 +250,6 @@ public class OracleSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         return oracleOffsetContext;
     }
 
-    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
-        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
-        // TODO: support save table schema
-        if (sourceSplitBase instanceof SnapshotSplit) {
-            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
-            engineHistory.add(
-                    dataSourceDialect.queryTableSchema(connection, 
snapshotSplit.getTableId()));
-        } else {
-            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
-            Map<TableId, byte[]> historyTableChanges = 
incrementalSplit.getHistoryTableChanges();
-            for (TableId tableId : incrementalSplit.getTableIds()) {
-                if (historyTableChanges != null && 
historyTableChanges.containsKey(tableId)) {
-                    SchemaAndValue schemaAndValue =
-                            jsonConverter.toConnectData("topic", 
historyTableChanges.get(tableId));
-                    Struct deserializedStruct = (Struct) 
schemaAndValue.value();
-
-                    TableChanges tableChanges =
-                            tableChangeSerializer.deserialize(
-                                    
Collections.singletonList(deserializedStruct), false);
-
-                    Iterator<TableChanges.TableChange> iterator = 
tableChanges.iterator();
-                    TableChanges.TableChange tableChange = null;
-                    while (iterator.hasNext()) {
-                        if (tableChange != null) {
-                            throw new IllegalStateException(
-                                    "The table changes should only have one 
element");
-                        }
-                        tableChange = iterator.next();
-                    }
-                    engineHistory.add(tableChange);
-                    continue;
-                }
-                
engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId));
-            }
-        }
-
-        EmbeddedDatabaseHistory.registerHistory(
-                sourceConfig
-                        .getDbzConfiguration()
-                        
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-                engineHistory);
-    }
-
     private void validateAndLoadDatabaseHistory(
             OracleOffsetContext offset, OracleDatabaseSchema schema) {
         schema.initializeStorage();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
index 2f4cb8a2da..3af8dbc63f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java
@@ -24,10 +24,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.exception.PostgresConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
@@ -68,10 +65,8 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -129,7 +124,7 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
 
     @Override
     public void configure(SourceSplitBase sourceSplitBase) {
-        registerDatabaseHistory(sourceSplitBase);
+        super.registerDatabaseHistory(sourceSplitBase, dataConnection);
 
         // initial stateful objects
         final PostgresConnectorConfig connectorConfig = 
getDbzConnectorConfig();
@@ -276,27 +271,6 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
         }
     }
 
-    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
-        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
-        // TODO: support save table schema
-        if (sourceSplitBase instanceof SnapshotSplit) {
-            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
-            engineHistory.add(
-                    dataSourceDialect.queryTableSchema(dataConnection, 
snapshotSplit.getTableId()));
-        } else {
-            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
-            for (TableId tableId : incrementalSplit.getTableIds()) {
-                
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
-            }
-        }
-
-        EmbeddedDatabaseHistory.registerHistory(
-                sourceConfig
-                        .getDbzConfiguration()
-                        
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-                engineHistory);
-    }
-
     @Override
     public PostgresSourceConfig getSourceConfig() {
         return (PostgresSourceConfig) sourceConfig;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index ba72f8533d..9362b0a314 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -22,10 +22,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
-import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
-import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
-import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
@@ -55,7 +52,6 @@ import io.debezium.pipeline.spi.OffsetContext;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
-import io.debezium.relational.history.TableChanges;
 import io.debezium.schema.DataCollectionId;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.Collect;
@@ -63,8 +59,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.sql.SQLException;
 import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 /** The context for fetch task that fetching data of snapshot split from MySQL 
data source. */
@@ -99,7 +93,7 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
 
     @Override
     public void configure(SourceSplitBase sourceSplitBase) {
-        registerDatabaseHistory(sourceSplitBase);
+        super.registerDatabaseHistory(sourceSplitBase, dataConnection);
 
         // initial stateful objects
         final SqlServerConnectorConfig connectorConfig = 
getDbzConnectorConfig();
@@ -282,27 +276,6 @@ public class SqlServerSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext
         return sqlServerOffsetContext;
     }
 
-    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
-        List<TableChanges.TableChange> engineHistory = new ArrayList<>();
-        // TODO: support save table schema
-        if (sourceSplitBase instanceof SnapshotSplit) {
-            SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase;
-            engineHistory.add(
-                    dataSourceDialect.queryTableSchema(dataConnection, 
snapshotSplit.getTableId()));
-        } else {
-            IncrementalSplit incrementalSplit = (IncrementalSplit) 
sourceSplitBase;
-            for (TableId tableId : incrementalSplit.getTableIds()) {
-                
engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId));
-            }
-        }
-
-        EmbeddedDatabaseHistory.registerHistory(
-                sourceConfig
-                        .getDbzConfiguration()
-                        
.getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-                engineHistory);
-    }
-
     public static class SqlServerEventMetadataProvider implements 
EventMetadataProvider {
 
         @Override

Reply via email to