This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 73a4f9c [feature]Support stream load configuration TwoPhaseCommit (#27) 73a4f9c is described below commit 73a4f9cd1fe82099364fde9081bbc754a1e70ba0 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Jun 6 16:00:12 2024 +0800 [feature]Support stream load configuration TwoPhaseCommit (#27) --- .../java/org/apache/doris/kafka/connector/cfg/DorisOptions.java | 9 +++++++++ .../doris/kafka/connector/cfg/DorisSinkConnectorConfig.java | 2 ++ .../org/apache/doris/kafka/connector/utils/HttpPutBuilder.java | 4 ++-- .../doris/kafka/connector/writer/commit/DorisCommitter.java | 2 +- .../doris/kafka/connector/writer/load/DorisStreamLoad.java | 2 +- 5 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index 5747925..3db78ca 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -51,6 +51,7 @@ public class DorisOptions { private boolean enableCustomJMX; private final int taskId; private final boolean enableDelete; + private boolean enable2PC; private boolean autoRedirect = true; private int requestReadTimeoutMs; private int requestConnectTimeoutMs; @@ -112,6 +113,10 @@ public class DorisOptions { } this.topicMap = getTopicToTableMap(config); + this.enable2PC = DorisSinkConnectorConfig.ENABLE_2PC_DEFAULT; + if (config.containsKey(DorisSinkConnectorConfig.ENABLE_2PC)) { + this.enable2PC = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.ENABLE_2PC)); + } enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT; if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) { enableCustomJMX = Boolean.parseBoolean(config.get(DorisSinkConnectorConfig.JMX_OPT)); @@ -202,6 +207,10 @@ public class DorisOptions { return topicMap.get(topic); } + public boolean enable2PC() { + return enable2PC; + } + public Map<String, String> getTopicMap() { return topicMap; } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index f4cabb3..02c24d0 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -92,6 +92,8 @@ public class DorisSinkConnectorConfig { public static final String ENABLE_DELETE = "enable.delete"; public static final boolean ENABLE_DELETE_DEFAULT = false; + public static final String ENABLE_2PC = "enable.2pc"; + public static final boolean ENABLE_2PC_DEFAULT = true; private static final ConfigDef.Validator nonEmptyStringValidator = new ConfigDef.NonEmptyString(); diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java b/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java index a51961c..36161ac 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/HttpPutBuilder.java @@ -110,8 +110,8 @@ public class HttpPutBuilder { return this; } - public HttpPutBuilder enable2PC() { - header.put("two_phase_commit", "true"); + public HttpPutBuilder enable2PC(boolean enable2PC) { + header.put("two_phase_commit", String.valueOf(enable2PC)); return this; } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java index d25ff8a..46d9a65 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/commit/DorisCommitter.java @@ -57,7 +57,7 @@ public class DorisCommitter { } public void commit(List<DorisCommittable> dorisCommittables) { - if (dorisCommittables.isEmpty()) { + if (!dorisOptions.enable2PC() || dorisCommittables.isEmpty()) { return; } for (DorisCommittable dorisCommittable : dorisCommittables) { diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java index cdbc2a6..b26a735 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/load/DorisStreamLoad.java @@ -78,7 +78,7 @@ public class DorisStreamLoad extends DataLoad { .addCommonHeader() .setEntity(entity) .addHiddenColumns(dorisOptions.isEnableDelete()) - .enable2PC() + .enable2PC(dorisOptions.enable2PC()) .addProperties(dorisOptions.getStreamLoadProp()); LOG.info("stream load started for {} on host {}", label, hostPort); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org