This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b922bb90e6 [Improve][CDC] Extract duplicate code (#8906) b922bb90e6 is described below commit b922bb90e65614a034cf5c2e5d2c5d180b76b6c2 Author: hailin0 <wanghai...@apache.org> AuthorDate: Thu Mar 6 16:42:47 2025 +0800 [Improve][CDC] Extract duplicate code (#8906) --- .../external/JdbcSourceFetchTaskContext.java | 53 ++++++++++++++++++++++ .../reader/fetch/MySqlSourceFetchTaskContext.java | 53 +--------------------- .../reader/fetch/OracleSourceFetchTaskContext.java | 53 +--------------------- .../reader/PostgresSourceFetchTaskContext.java | 28 +----------- .../fetch/SqlServerSourceFetchTaskContext.java | 29 +----------- 5 files changed, 57 insertions(+), 159 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java index 48380d024f..e8d058f2a2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -22,26 +22,35 @@ import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; +import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; +import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; +import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; import org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer; +import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.source.SourceRecord; import io.debezium.config.CommonConnectorConfig; import io.debezium.data.Envelope; +import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; import io.debezium.util.SchemaNameAdjuster; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -155,6 +164,50 @@ public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { .collect(Collectors.toList()); } + protected void registerDatabaseHistory( + SourceSplitBase sourceSplitBase, JdbcConnection connection) { + List<TableChanges.TableChange> engineHistory = new ArrayList<>(); + // TODO: support save table schema + if (sourceSplitBase instanceof SnapshotSplit) { + SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; + engineHistory.add( + dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId())); + } else { + IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; + Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges(); + for (TableId tableId : incrementalSplit.getTableIds()) { + if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) { + SchemaAndValue schemaAndValue = + jsonConverter.toConnectData("topic", historyTableChanges.get(tableId)); + Struct deserializedStruct = (Struct) schemaAndValue.value(); + + TableChanges tableChanges = + tableChangeSerializer.deserialize( + Collections.singletonList(deserializedStruct), false); + + Iterator<TableChanges.TableChange> iterator = tableChanges.iterator(); + TableChanges.TableChange tableChange = null; + while (iterator.hasNext()) { + if (tableChange != null) { + throw new IllegalStateException( + "The table changes should only have one element"); + } + tableChange = iterator.next(); + } + engineHistory.add(tableChange); + continue; + } + engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId)); + } + } + + EmbeddedDatabaseHistory.registerHistory( + sourceConfig + .getDbzConfiguration() + .getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), + engineHistory); + } + public SourceConfig getSourceConfig() { return sourceConfig; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index 93e7cd1f2e..fcf25d50ea 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -24,16 +24,12 @@ import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext; -import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; -import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; -import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils; -import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -66,7 +62,6 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; -import io.debezium.relational.history.TableChanges; import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; @@ -75,9 +70,6 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.sql.SQLException; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -118,7 +110,7 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { @Override public void configure(SourceSplitBase sourceSplitBase) { - registerDatabaseHistory(sourceSplitBase); + super.registerDatabaseHistory(sourceSplitBase, connection); // initial stateful objects final MySqlConnectorConfig connectorConfig = getDbzConnectorConfig(); @@ -385,49 +377,6 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { schema.recover(Offsets.of(mySqlPartition, offset)); } - private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) { - List<TableChanges.TableChange> engineHistory = new ArrayList<>(); - // TODO: support save table schema - if (sourceSplitBase instanceof SnapshotSplit) { - SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; - engineHistory.add( - dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId())); - } else { - IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; - Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges(); - for (TableId tableId : incrementalSplit.getTableIds()) { - if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) { - SchemaAndValue schemaAndValue = - jsonConverter.toConnectData("topic", historyTableChanges.get(tableId)); - Struct deserializedStruct = (Struct) schemaAndValue.value(); - - TableChanges tableChanges = - tableChangeSerializer.deserialize( - Collections.singletonList(deserializedStruct), false); - - Iterator<TableChanges.TableChange> iterator = tableChanges.iterator(); - TableChanges.TableChange tableChange = null; - while (iterator.hasNext()) { - if (tableChange != null) { - throw new IllegalStateException( - "The table changes should only have one element"); - } - tableChange = iterator.next(); - } - engineHistory.add(tableChange); - continue; - } - engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId)); - } - } - - EmbeddedDatabaseHistory.registerHistory( - sourceConfig - .getDbzConfiguration() - .getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), - engineHistory); - } - /** A subclass implementation of {@link MySqlTaskContext} which reuses one BinaryLogClient. */ public class MySqlTaskContextImpl extends MySqlTaskContext { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 942532a7f6..b7a5dc9c17 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -23,15 +23,11 @@ import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext; -import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; -import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; -import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.offset.RedoLogOffset; import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils; -import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -66,11 +62,7 @@ import lombok.extern.slf4j.Slf4j; import java.sql.SQLException; import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection; @@ -105,7 +97,7 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext { @Override public void configure(SourceSplitBase sourceSplitBase) { // Initializes the table schema - registerDatabaseHistory(sourceSplitBase); + super.registerDatabaseHistory(sourceSplitBase, connection); // initial stateful objects final OracleConnectorConfig connectorConfig = getDbzConnectorConfig(); @@ -258,49 +250,6 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext { return oracleOffsetContext; } - private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) { - List<TableChanges.TableChange> engineHistory = new ArrayList<>(); - // TODO: support save table schema - if (sourceSplitBase instanceof SnapshotSplit) { - SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; - engineHistory.add( - dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId())); - } else { - IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; - Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges(); - for (TableId tableId : incrementalSplit.getTableIds()) { - if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) { - SchemaAndValue schemaAndValue = - jsonConverter.toConnectData("topic", historyTableChanges.get(tableId)); - Struct deserializedStruct = (Struct) schemaAndValue.value(); - - TableChanges tableChanges = - tableChangeSerializer.deserialize( - Collections.singletonList(deserializedStruct), false); - - Iterator<TableChanges.TableChange> iterator = tableChanges.iterator(); - TableChanges.TableChange tableChange = null; - while (iterator.hasNext()) { - if (tableChange != null) { - throw new IllegalStateException( - "The table changes should only have one element"); - } - tableChange = iterator.next(); - } - engineHistory.add(tableChange); - continue; - } - engineHistory.add(dataSourceDialect.queryTableSchema(connection, tableId)); - } - } - - EmbeddedDatabaseHistory.registerHistory( - sourceConfig - .getDbzConfiguration() - .getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), - engineHistory); - } - private void validateAndLoadDatabaseHistory( OracleOffsetContext offset, OracleDatabaseSchema schema) { schema.initializeStorage(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java index 2f4cb8a2da..3af8dbc63f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java @@ -24,10 +24,7 @@ import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext; -import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; -import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; -import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.exception.PostgresConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset; @@ -68,10 +65,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -129,7 +124,7 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { @Override public void configure(SourceSplitBase sourceSplitBase) { - registerDatabaseHistory(sourceSplitBase); + super.registerDatabaseHistory(sourceSplitBase, dataConnection); // initial stateful objects final PostgresConnectorConfig connectorConfig = getDbzConnectorConfig(); @@ -276,27 +271,6 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { } } - private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) { - List<TableChanges.TableChange> engineHistory = new ArrayList<>(); - // TODO: support save table schema - if (sourceSplitBase instanceof SnapshotSplit) { - SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; - engineHistory.add( - dataSourceDialect.queryTableSchema(dataConnection, snapshotSplit.getTableId())); - } else { - IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; - for (TableId tableId : incrementalSplit.getTableIds()) { - engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId)); - } - } - - EmbeddedDatabaseHistory.registerHistory( - sourceConfig - .getDbzConfiguration() - .getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), - engineHistory); - } - @Override public PostgresSourceConfig getSourceConfig() { return (PostgresSourceConfig) sourceConfig; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java index ba72f8533d..9362b0a314 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -22,10 +22,7 @@ import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext; -import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; -import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; -import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset; import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils; @@ -55,7 +52,6 @@ import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; -import io.debezium.relational.history.TableChanges; import io.debezium.schema.DataCollectionId; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; @@ -63,8 +59,6 @@ import lombok.extern.slf4j.Slf4j; import java.sql.SQLException; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; /** The context for fetch task that fetching data of snapshot split from MySQL data source. */ @@ -99,7 +93,7 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext @Override public void configure(SourceSplitBase sourceSplitBase) { - registerDatabaseHistory(sourceSplitBase); + super.registerDatabaseHistory(sourceSplitBase, dataConnection); // initial stateful objects final SqlServerConnectorConfig connectorConfig = getDbzConnectorConfig(); @@ -282,27 +276,6 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext return sqlServerOffsetContext; } - private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) { - List<TableChanges.TableChange> engineHistory = new ArrayList<>(); - // TODO: support save table schema - if (sourceSplitBase instanceof SnapshotSplit) { - SnapshotSplit snapshotSplit = (SnapshotSplit) sourceSplitBase; - engineHistory.add( - dataSourceDialect.queryTableSchema(dataConnection, snapshotSplit.getTableId())); - } else { - IncrementalSplit incrementalSplit = (IncrementalSplit) sourceSplitBase; - for (TableId tableId : incrementalSplit.getTableIds()) { - engineHistory.add(dataSourceDialect.queryTableSchema(dataConnection, tableId)); - } - } - - EmbeddedDatabaseHistory.registerHistory( - sourceConfig - .getDbzConfiguration() - .getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), - engineHistory); - } - public static class SqlServerEventMetadataProvider implements EventMetadataProvider { @Override