This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d721f9d649 [INLONG-9441][Manager] MySQL data source supports both full and incremental modes (#9442) d721f9d649 is described below commit d721f9d649aa2896c6be48bae969d8aa0aa0889d Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri Dec 8 15:37:34 2023 +0800 [INLONG-9441][Manager] MySQL data source supports both full and incremental modes (#9442) --- .../inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java | 3 +++ .../org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSource.java | 3 +++ .../apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java | 3 +++ .../inlong/manager/pojo/source/mysql/MySQLBinlogSourceRequest.java | 3 +++ 4 files changed, 12 insertions(+) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java index 894b83eb1c..4016d3ff71 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java @@ -70,6 +70,9 @@ public class MySQLBinlogProvider implements ExtractNodeProvider { // Unique properties when migrate all tables in database properties.put("migrate-all", "true"); } + if (binlogSource.isOnlyIncremental()) { + properties.put("scan.startup.mode", "latest-offset"); + } return new MySqlExtractNode(binlogSource.getSourceName(), binlogSource.getSourceName(), diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSource.java index 420b4a86fc..1b6eb36cac 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSource.java @@ -94,6 +94,9 @@ public class MySQLBinlogSource extends StreamSource { @ApiModelProperty("Need transfer total database") private boolean allMigration; + @ApiModelProperty("Only incremental") + private boolean onlyIncremental; + @ApiModelProperty("Primary key must be shared by all tables") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java index 1e3ccd5fae..4b9a64f780 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceDTO.java @@ -109,6 +109,9 @@ public class MySQLBinlogSourceDTO { @ApiModelProperty("Whether to migrate all databases") private boolean allMigration; + @ApiModelProperty("Only incremental") + private boolean onlyIncremental; + @ApiModelProperty("Primary key must be shared by all tables") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceRequest.java index 9f748be557..4b51a9c215 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/mysql/MySQLBinlogSourceRequest.java @@ -103,6 +103,9 @@ public class MySQLBinlogSourceRequest extends SourceRequest { @ApiModelProperty("Need transfer total database") private boolean allMigration = false; + @ApiModelProperty("Only incremental") + private boolean onlyIncremental; + @ApiModelProperty("Primary key must be shared by all tables") private String primaryKey;