This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a4f9c  [feature]Support stream load configuration TwoPhaseCommit 
(#27)
73a4f9c is described below

commit 73a4f9cd1fe82099364fde9081bbc754a1e70ba0
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Thu Jun 6 16:00:12 2024 +0800

    [feature]Support stream load configuration TwoPhaseCommit (#27)
---
 .../java/org/apache/doris/kafka/connector/cfg/DorisOptions.java  | 9 +++++++++
 .../doris/kafka/connector/cfg/DorisSinkConnectorConfig.java      | 2 ++
 .../org/apache/doris/kafka/connector/utils/HttpPutBuilder.java   | 4 ++--
 .../doris/kafka/connector/writer/commit/DorisCommitter.java      | 2 +-
 .../doris/kafka/connector/writer/load/DorisStreamLoad.java       | 2 +-
 5 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index 5747925..3db78ca 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -51,6 +51,7 @@ public class DorisOptions {
     private boolean enableCustomJMX;
     private final int taskId;
     private final boolean enableDelete;
+    private boolean enable2PC;
     private boolean autoRedirect = true;
     private int requestReadTimeoutMs;
     private int requestConnectTimeoutMs;
@@ -112,6 +113,10 @@ public class DorisOptions {
         }
         this.topicMap = getTopicToTableMap(config);
 
+        this.enable2PC = DorisSinkConnectorConfig.ENABLE_2PC_DEFAULT;
+        if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) {
+            this.enable2PC = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC));
+        }
         enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT;
         if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) {
             enableCustomJMX = 
Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT));
@@ -202,6 +207,10 @@ public class DorisOptions {
         return topicMap.get(topic);
     }
 
+    public boolean enable2PC() {
+        return enable2PC;
+    }
+
     public Map<String, String> getTopicMap() {
         return topicMap;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index f4cabb3..02c24d0 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -92,6 +92,8 @@ public class DorisSinkConnectorConfig {
 
     public static final String ENABLE_DELETE = "enable.delete";
     public static final boolean ENABLE_DELETE_DEFAULT = false;
+    public static final String ENABLE_2PC = "enable.2pc";
+    public static final boolean ENABLE_2PC_DEFAULT = true;
 
     private static final ConfigDef.Validator nonEmptyStringValidator =
             new ConfigDef.NonEmptyString();
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java
index a51961c..36161ac 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java
@@ -110,8 +110,8 @@ public class HttpPutBuilder {
         return this;
     }
 
-    public HttpPutBuilder enable2PC() {
-        header.put("two_phase_commit", "true");
+    public HttpPutBuilder enable2PC(boolean enable2PC) {
+        header.put("two_phase_commit", String.valueOf(enable2PC));
         return this;
     }
 
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
index d25ff8a..46d9a65 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java
@@ -57,7 +57,7 @@ public class DorisCommitter {
     }
 
     public void commit(List<DorisCommittable> dorisCommittables) {
-        if (dorisCommittables.isEmpty()) {
+        if (!dorisOptions.enable2PC() || dorisCommittables.isEmpty()) {
             return;
         }
         for (DorisCommittable dorisCommittable : dorisCommittables) {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
index cdbc2a6..b26a735 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java
@@ -78,7 +78,7 @@ public class DorisStreamLoad extends DataLoad {
                 .addCommonHeader()
                 .setEntity(entity)
                 .addHiddenColumns(dorisOptions.isEnableDelete())
-                .enable2PC()
+                .enable2PC(dorisOptions.enable2PC())
                 .addProperties(dorisOptions.getStreamLoadProp());
 
         LOG.info("stream load started for {} on host {}", label, hostPort);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to