This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0abf1e2  [improve] improve commit retry strategy (#218)
0abf1e2 is described below

commit 0abf1e2bc82f9e33e847f4b22ee5f13882c4fdf1
Author: wudi <676366...@qq.com>
AuthorDate: Tue Oct 24 16:20:35 2023 +0800

    [improve] improve commit retry strategy (#218)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |  2 +-
 .../org/apache/doris/flink/sink/BackendUtil.java   |  2 +-
 .../doris/flink/sink/committer/DorisCommitter.java | 15 ++++++------
 .../flink/sink/committer/TestDorisCommitter.java   | 28 +++++++++++++++-------
 4 files changed, 29 insertions(+), 18 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 8f7022d..4a03024 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -29,7 +29,7 @@ public class DorisExecutionOptions implements Serializable {
 
     private static final long serialVersionUID = 1L;
     public static final int DEFAULT_CHECK_INTERVAL = 10000;
-    public static final int DEFAULT_MAX_RETRY_TIMES = 1;
+    public static final int DEFAULT_MAX_RETRY_TIMES = 3;
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
     private static final int DEFAULT_BUFFER_COUNT = 3;
     //batch flush
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index 9f9516a..0d45e2f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -74,7 +74,7 @@ public class BackendUtil {
         throw new DorisRuntimeException("no available backend.");
     }
 
-    public boolean tryHttpConnection(String backend) {
+    public static boolean tryHttpConnection(String backend) {
         try {
             backend = "http://"; + backend;
             URL url = new URL(backend);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 2a0fba0..ffcb8af 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -94,7 +94,7 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
 
         LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
         int retry = 0;
-        while (retry++ <= maxRetry) {
+        while (retry <= maxRetry) {
             //get latest-url
             String url = String.format(commitPattern, hostPort, 
committable.getDb());
             HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
@@ -108,7 +108,7 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
                         Map<String, String> res = 
jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
                         });
                         if (!res.get("status").equals(SUCCESS) && 
!ResponseUtil.isCommitted(res.get("msg"))) {
-                            throw new DorisRuntimeException("Commit failed " + 
loadResult);
+                            throw new DorisRuntimeException("commit 
transaction failed " + loadResult);
                         } else {
                             LOG.info("load result {}", loadResult);
                         }
@@ -116,18 +116,19 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
                     return;
                 }
                 String reasonPhrase = statusLine.getReasonPhrase();
-                LOG.warn("commit failed with {}, reason {}", hostPort, 
reasonPhrase);
+                LOG.error("commit failed with {}, reason {}", hostPort, 
reasonPhrase);
                 if (retry == maxRetry) {
-                    throw new DorisRuntimeException("stream load error: " + 
reasonPhrase);
+                    throw new DorisRuntimeException("commit transaction error: 
" + reasonPhrase);
                 }
                 hostPort = backendUtil.getAvailableBackend();
-            } catch (IOException e) {
-                LOG.error("commit transaction failed: ", e);
+            } catch (Exception e) {
+                LOG.error("commit transaction failed, to retry, {}", 
e.getMessage());
                 if (retry == maxRetry) {
-                    throw new IOException("commit transaction failed: {}", e);
+                    throw new DorisRuntimeException("commit transaction error, 
", e);
                 }
                 hostPort = backendUtil.getAvailableBackend();
             }
+            retry++;
         }
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index 794f806..a3178e3 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -21,7 +21,8 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
+import org.apache.doris.flink.rest.models.BackendV2;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpEntityMock;
 import org.apache.doris.flink.sink.OptionUtils;
@@ -32,9 +33,10 @@ import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicStatusLine;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.MockedStatic;
-import org.slf4j.Logger;
 
 import java.util.Collections;
 
@@ -52,6 +54,9 @@ public class TestDorisCommitter {
     DorisCommittable dorisCommittable;
     HttpEntityMock entityMock;
     private MockedStatic<RestService> restServiceMockedStatic;
+    private MockedStatic<BackendUtil> backendUtilMockedStatic;
+    @Rule
+    public ExpectedException thrown= ExpectedException.none();
 
     @Before
     public void setUp() throws Exception {
@@ -63,15 +68,17 @@ public class TestDorisCommitter {
         CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
         StatusLine normalLine = new BasicStatusLine(new 
ProtocolVersion("http", 1, 0), 200, "");
         restServiceMockedStatic = mockStatic(RestService.class);
-        Logger mockLogger = mock(Logger.class);
-        mock(RestService.class);
+        backendUtilMockedStatic = mockStatic(BackendUtil.class);
 
         when(httpClient.execute(any())).thenReturn(httpResponse);
         when(httpResponse.getStatusLine()).thenReturn(normalLine);
         when(httpResponse.getEntity()).thenReturn(entityMock);
-        when(RestService.getBackendsV2(dorisOptions, readOptions, 
mockLogger)).thenReturn(
-                Collections.singletonList(new BackendRowV2()));
-        dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 2, 
httpClient);
+
+        restServiceMockedStatic.when(()-> 
RestService.getBackendsV2(any(),any(),any()))
+                
.thenReturn(Collections.singletonList(BackendV2.BackendRowV2.of("127.0.0.1", 
8040,true)));
+        backendUtilMockedStatic.when(()-> 
BackendUtil.tryHttpConnection(any())).thenReturn(true);
+
+        dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, 
httpClient);
     }
 
     @Test
@@ -83,11 +90,13 @@ public class TestDorisCommitter {
         this.entityMock.setValue(response);
         final MockCommitRequest<DorisCommittable> request = new 
MockCommitRequest<>(dorisCommittable);
         dorisCommitter.commit(Collections.singletonList(request));
-
     }
 
-    @Test(expected = DorisRuntimeException.class)
+    @Test
     public void testCommitAbort() throws Exception {
+        thrown.expect(DorisRuntimeException.class);
+        thrown.expectMessage("commit transaction error");
+
         String response = "{\n" +
                 "\"status\": \"Fail\",\n" +
                 "\"msg\": \"errCode = 2, detailMessage = transaction [25] is 
already aborted. abort reason: User Abort\"\n" +
@@ -100,5 +109,6 @@ public class TestDorisCommitter {
     @After
     public void after() {
         restServiceMockedStatic.close();
+        backendUtilMockedStatic.close();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to