This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 81952c0  [optimization] add disable 2pc config (#34)
81952c0 is described below

commit 81952c0af2024d3484f15106f4588fd18da725ae
Author: madong <44109015+mado...@users.noreply.github.com>
AuthorDate: Thu Jun 30 22:56:33 2022 +0800

    [optimization] add disable 2pc config (#34)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     | 18 ++++++++++--
 .../doris/flink/sink/writer/DorisStreamLoad.java   | 19 +++++++-----
 .../doris/flink/sink/writer/DorisWriter.java       | 21 ++++++++-----
 .../doris/flink/sink/writer/LabelGenerator.java    | 34 ++++++++++++++++++++++
 .../flink/table/DorisDynamicTableFactory.java      | 10 +++++++
 .../flink/sink/writer/TestDorisStreamLoad.java     | 14 ++++-----
 .../doris/flink/sink/writer/TestDorisWriter.java   |  6 ++--
 7 files changed, 95 insertions(+), 27 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index cc4b203..2daf5e1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -45,13 +45,16 @@ public class DorisExecutionOptions implements Serializable {
 
     private final Boolean enableDelete;
 
+    private final Boolean enable2PC;
 
     public DorisExecutionOptions(int checkInterval,
                                  int maxRetries,
                                  int bufferSize,
                                  int bufferCount,
                                  String labelPrefix,
-                                 Properties streamLoadProp, Boolean 
enableDelete) {
+                                 Properties streamLoadProp,
+                                 Boolean enableDelete,
+                                 Boolean enable2PC) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -60,6 +63,7 @@ public class DorisExecutionOptions implements Serializable {
         this.labelPrefix = labelPrefix;
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
+        this.enable2PC = enable2PC;
     }
 
     public static Builder builder() {
@@ -105,6 +109,9 @@ public class DorisExecutionOptions implements Serializable {
         return enableDelete;
     }
 
+    public Boolean enabled2PC() {
+        return enable2PC;
+    }
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -117,6 +124,8 @@ public class DorisExecutionOptions implements Serializable {
         private Properties streamLoadProp = new Properties();
         private boolean enableDelete = false;
 
+        private boolean enable2PC = true;
+
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
             return this;
@@ -152,8 +161,13 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder disable2PC() {
+            this.enable2PC = false;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete);
+            return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete, enable2PC);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index b468cc8..dfb9cb7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -61,7 +61,7 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KE
 public class DorisStreamLoad implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisStreamLoad.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private final String labelSuffix;
+    private final LabelGenerator labelGenerator;
     private final byte[] lineDelimiter;
     private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
     private static final String ABORT_URL_PATTERN = 
"http://%s/api/%s/_stream_load_2pc";;
@@ -74,6 +74,7 @@ public class DorisStreamLoad implements Serializable {
     private final String passwd;
     private final String db;
     private final String table;
+    private final boolean enable2PC;
     private final Properties streamLoadProp;
     private final RecordStream recordStream;
     private Future<CloseableHttpResponse> pendingLoadFuture;
@@ -84,7 +85,7 @@ public class DorisStreamLoad implements Serializable {
     public DorisStreamLoad(String hostPort,
                            DorisOptions dorisOptions,
                            DorisExecutionOptions executionOptions,
-                           String labelSuffix,
+                           LabelGenerator labelGenerator,
                            CloseableHttpClient httpClient) {
         this.hostPort = hostPort;
         String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
@@ -92,9 +93,10 @@ public class DorisStreamLoad implements Serializable {
         this.table = tableInfo[1];
         this.user = dorisOptions.getUsername();
         this.passwd = dorisOptions.getPassword();
-        this.labelSuffix = labelSuffix;
+        this.labelGenerator = labelGenerator;
         this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
         this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
+        this.enable2PC = executionOptions.enabled2PC();
         this.streamLoadProp = executionOptions.getStreamLoadProp();
         this.httpClient = httpClient;
         this.executorService = new ThreadPoolExecutor(1, 1,
@@ -133,7 +135,7 @@ public class DorisStreamLoad implements Serializable {
         LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, 
chkID);
         while (true) {
             try {
-                String label = labelSuffix + "_" + startChkID;
+                String label = labelGenerator.generateLabel(startChkID);
                 HttpPutBuilder builder = new HttpPutBuilder();
                 builder.setUrl(loadUrlStr)
                         .baseAuth(user, passwd)
@@ -218,12 +220,11 @@ public class DorisStreamLoad implements Serializable {
 
     /**
      * start write data for new checkpoint.
-     * @param chkID
+     * @param label
      * @throws IOException
      */
-    public void startLoad(long chkID) throws IOException{
+    public void startLoad(String label) throws IOException{
         loadBatchFirstRecord = true;
-        String label = labelSuffix + "_" + chkID;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput();
         LOG.info("stream load started for {}", label);
@@ -232,10 +233,12 @@ public class DorisStreamLoad implements Serializable {
             putBuilder.setUrl(loadUrlStr)
                     .baseAuth(user, passwd)
                     .addCommonHeader()
-                    .enable2PC()
                     .setLabel(label)
                     .setEntity(entity)
                     .addProperties(streamLoadProp);
+            if (enable2PC) {
+               putBuilder.enable2PC();
+            }
             pendingLoadFuture = executorService.submit(() -> {
                 LOG.info("start execute load");
                 return httpClient.execute(putBuilder.build());
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 026f54e..86ed9ed 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -66,6 +66,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     private final DorisReadOptions dorisReadOptions;
     private final DorisExecutionOptions executionOptions;
     private final String labelPrefix;
+    private final LabelGenerator labelGenerator;
     private final int intervalTime;
     private final DorisWriterState dorisWriterState;
     private final DorisRecordSerializer<IN> serializer;
@@ -87,6 +88,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
         this.dorisWriterState = new 
DorisWriterState(executionOptions.getLabelPrefix());
         this.labelPrefix = executionOptions.getLabelPrefix() + "_" + 
initContext.getSubtaskId();
+        this.labelGenerator = new LabelGenerator(labelPrefix, 
executionOptions.enabled2PC());
         this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new 
ExecutorThreadFactory("stream-load-check"));
         this.serializer = serializer;
         this.dorisOptions = dorisOptions;
@@ -98,20 +100,22 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
 
     public void initializeLoad(List<DorisWriterState> state) throws 
IOException {
         try {
-            this.dorisStreamLoad  = new DorisStreamLoad(
+            this.dorisStreamLoad = new DorisStreamLoad(
                     RestService.getBackend(dorisOptions, dorisReadOptions, 
LOG),
                     dorisOptions,
                     executionOptions,
-                    labelPrefix, new HttpUtil().getHttpClient());
+                    labelGenerator, new HttpUtil().getHttpClient());
             // TODO: we need check and abort all pending transaction.
             //  Discard transactions that may cause the job to fail.
-            dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+            if(executionOptions.enabled2PC()) {
+                dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 
1);
+            }
         } catch (Exception e) {
             throw new DorisRuntimeException(e);
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        dorisStreamLoad.startLoad(lastCheckpointId + 1);
+        
dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
         // when uploading data in streaming mode, we need to regularly detect 
whether there are exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, 
intervalTime, TimeUnit.MILLISECONDS);
     }
@@ -124,6 +128,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
 
     @Override
     public List<DorisCommittable> prepareCommit(boolean flush) throws 
IOException {
+        // disable exception checker before stop load.
         loading = false;
         Preconditions.checkState(dorisStreamLoad != null);
         RespContent respContent = dorisStreamLoad.stopLoad();
@@ -131,7 +136,9 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
             String errMsg = String.format("stream load error: %s, see more in 
%s", respContent.getMessage(), respContent.getErrorURL());
             throw new DorisRuntimeException(errMsg);
         }
-
+        if (!executionOptions.enabled2PC()) {
+            return Collections.emptyList();
+        }
         long txnId = respContent.getTxnId();
 
         return ImmutableList.of(new 
DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), 
txnId));
@@ -140,7 +147,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     @Override
     public List<DorisWriterState> snapshotState(long checkpointId) throws 
IOException {
         Preconditions.checkState(dorisStreamLoad != null);
-        this.dorisStreamLoad.startLoad(checkpointId + 1);
+        
this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
         this.loading = true;
         return Collections.singletonList(dorisWriterState);
     }
@@ -173,7 +180,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
 
     private void checkLoadException() {
         if (loadException != null) {
-            throw new RuntimeException("error while load exception.", 
loadException);
+            throw new RuntimeException("error while loading data.", 
loadException);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
new file mode 100644
index 0000000..436d709
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.sink.writer;
+
+/**
+ * Generator label for stream load.
+ */
+public class LabelGenerator {
+    private String labelPrefix;
+    private boolean enable2PC;
+
+    public LabelGenerator(String labelPrefix, boolean enable2PC) {
+        this.labelPrefix = labelPrefix;
+        this.enable2PC = enable2PC;
+    }
+
+    public String generateLabel(long chkId) {
+        return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + 
System.currentTimeMillis();
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 7d6455e..be00cff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -118,6 +118,12 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
             .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
             .withDescription("");
     // flink write config options
+    private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
+            .key("sink.enable-2pc")
+            .booleanType()
+            .defaultValue(true)
+            .withDescription("enable 2PC while loading");
+
     private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = 
ConfigOptions
             .key("sink.check-interval")
             .intType()
@@ -195,6 +201,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(DORIS_EXEC_MEM_LIMIT);
 
         options.add(SINK_CHECK_INTERVAL);
+        options.add(SINK_ENABLE_2PC);
         options.add(SINK_MAX_RETRIES);
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
         options.add(SINK_ENABLE_DELETE);
@@ -262,6 +269,9 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX));
         builder.setStreamLoadProp(streamLoadProp);
         builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
+        if (!readableConfig.get(SINK_ENABLE_2PC)) {
+            builder.disable2PC();
+        }
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index b401a5b..b4532ee 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -57,7 +57,7 @@ public class TestDorisStreamLoad {
         CloseableHttpResponse abortSuccessResponse = 
HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(existLabelResponse, 
abortSuccessResponse, preCommitResponse);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, "", httpClient);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("test001_0", true), 
httpClient);
         dorisStreamLoad.abortPreCommit("test001_0", 1);
     }
 
@@ -67,8 +67,8 @@ public class TestDorisStreamLoad {
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.stopLoad();
         byte[] buff = new byte[4];
@@ -86,8 +86,8 @@ public class TestDorisStreamLoad {
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.stopLoad();
@@ -111,8 +111,8 @@ public class TestDorisStreamLoad {
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord("{\"id\": 
1}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.writeRecord("{\"id\": 
2}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.stopLoad();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 094e8a4..3bf32b4 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -62,8 +62,8 @@ public class TestDorisWriter {
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", 
dorisOptions, executionOptions, "", httpClient);
-        dorisStreamLoad.startLoad(1);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoad.startLoad("");
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, 
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, 
readOptions, executionOptions);
@@ -83,7 +83,7 @@ public class TestDorisWriter {
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", 
dorisOptions, executionOptions, "", httpClient);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, 
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, 
readOptions, executionOptions);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to