This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 0abf1e2 [improve] improve commit retry strategy (#218) 0abf1e2 is described below commit 0abf1e2bc82f9e33e847f4b22ee5f13882c4fdf1 Author: wudi <676366...@qq.com> AuthorDate: Tue Oct 24 16:20:35 2023 +0800 [improve] improve commit retry strategy (#218) --- .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../org/apache/doris/flink/sink/BackendUtil.java | 2 +- .../doris/flink/sink/committer/DorisCommitter.java | 15 ++++++------ .../flink/sink/committer/TestDorisCommitter.java | 28 +++++++++++++++------- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 8f7022d..4a03024 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -29,7 +29,7 @@ public class DorisExecutionOptions implements Serializable { private static final long serialVersionUID = 1L; public static final int DEFAULT_CHECK_INTERVAL = 10000; - public static final int DEFAULT_MAX_RETRY_TIMES = 1; + public static final int DEFAULT_MAX_RETRY_TIMES = 3; private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; private static final int DEFAULT_BUFFER_COUNT = 3; //batch flush diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 9f9516a..0d45e2f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -74,7 +74,7 @@ public class BackendUtil { throw new DorisRuntimeException("no available backend."); } - public boolean tryHttpConnection(String backend) { + public static boolean tryHttpConnection(String backend) { try { backend = "http://" + backend; URL url = new URL(backend); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 2a0fba0..ffcb8af 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -94,7 +94,7 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort); int retry = 0; - while (retry++ <= maxRetry) { + while (retry <= maxRetry) { //get latest-url String url = String.format(commitPattern, hostPort, committable.getDb()); HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build(); @@ -108,7 +108,7 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { Map<String, String> res = jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() { }); if (!res.get("status").equals(SUCCESS) && !ResponseUtil.isCommitted(res.get("msg"))) { - throw new DorisRuntimeException("Commit failed " + loadResult); + throw new DorisRuntimeException("commit transaction failed " + loadResult); } else { LOG.info("load result {}", loadResult); } @@ -116,18 +116,19 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { return; } String reasonPhrase = statusLine.getReasonPhrase(); - LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase); + LOG.error("commit failed with {}, reason {}", hostPort, reasonPhrase); if (retry == maxRetry) { - throw new DorisRuntimeException("stream load error: " + reasonPhrase); + throw new DorisRuntimeException("commit transaction error: " + reasonPhrase); } hostPort = backendUtil.getAvailableBackend(); - } catch (IOException e) { - LOG.error("commit transaction failed: ", e); + } catch (Exception e) { + LOG.error("commit transaction failed, to retry, {}", e.getMessage()); if (retry == maxRetry) { - throw new IOException("commit transaction failed: {}", e); + throw new DorisRuntimeException("commit transaction error, ", e); } hostPort = backendUtil.getAvailableBackend(); } + retry++; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index 794f806..a3178e3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -21,7 +21,8 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.RestService; -import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2; +import org.apache.doris.flink.rest.models.BackendV2; +import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpEntityMock; import org.apache.doris.flink.sink.OptionUtils; @@ -32,9 +33,10 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicStatusLine; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.MockedStatic; -import org.slf4j.Logger; import java.util.Collections; @@ -52,6 +54,9 @@ public class TestDorisCommitter { DorisCommittable dorisCommittable; HttpEntityMock entityMock; private MockedStatic<RestService> restServiceMockedStatic; + private MockedStatic<BackendUtil> backendUtilMockedStatic; + @Rule + public ExpectedException thrown= ExpectedException.none(); @Before public void setUp() throws Exception { @@ -63,15 +68,17 @@ public class TestDorisCommitter { CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class); StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, ""); restServiceMockedStatic = mockStatic(RestService.class); - Logger mockLogger = mock(Logger.class); - mock(RestService.class); + backendUtilMockedStatic = mockStatic(BackendUtil.class); when(httpClient.execute(any())).thenReturn(httpResponse); when(httpResponse.getStatusLine()).thenReturn(normalLine); when(httpResponse.getEntity()).thenReturn(entityMock); - when(RestService.getBackendsV2(dorisOptions, readOptions, mockLogger)).thenReturn( - Collections.singletonList(new BackendRowV2())); - dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 2, httpClient); + + restServiceMockedStatic.when(()-> RestService.getBackendsV2(any(),any(),any())) + .thenReturn(Collections.singletonList(BackendV2.BackendRowV2.of("127.0.0.1", 8040,true))); + backendUtilMockedStatic.when(()-> BackendUtil.tryHttpConnection(any())).thenReturn(true); + + dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, httpClient); } @Test @@ -83,11 +90,13 @@ public class TestDorisCommitter { this.entityMock.setValue(response); final MockCommitRequest<DorisCommittable> request = new MockCommitRequest<>(dorisCommittable); dorisCommitter.commit(Collections.singletonList(request)); - } - @Test(expected = DorisRuntimeException.class) + @Test public void testCommitAbort() throws Exception { + thrown.expect(DorisRuntimeException.class); + thrown.expectMessage("commit transaction error"); + String response = "{\n" + "\"status\": \"Fail\",\n" + "\"msg\": \"errCode = 2, detailMessage = transaction [25] is already aborted. abort reason: User Abort\"\n" + @@ -100,5 +109,6 @@ public class TestDorisCommitter { @After public void after() { restServiceMockedStatic.close(); + backendUtilMockedStatic.close(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org