This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0abf1e2 [improve] improve commit retry strategy (#218)
0abf1e2 is described below
commit 0abf1e2bc82f9e33e847f4b22ee5f13882c4fdf1
Author: wudi <[email protected]>
AuthorDate: Tue Oct 24 16:20:35 2023 +0800
[improve] improve commit retry strategy (#218)
---
.../doris/flink/cfg/DorisExecutionOptions.java | 2 +-
.../org/apache/doris/flink/sink/BackendUtil.java | 2 +-
.../doris/flink/sink/committer/DorisCommitter.java | 15 ++++++------
.../flink/sink/committer/TestDorisCommitter.java | 28 +++++++++++++++-------
4 files changed, 29 insertions(+), 18 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 8f7022d..4a03024 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -29,7 +29,7 @@ public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_CHECK_INTERVAL = 10000;
- public static final int DEFAULT_MAX_RETRY_TIMES = 1;
+ public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
private static final int DEFAULT_BUFFER_COUNT = 3;
//batch flush
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index 9f9516a..0d45e2f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -74,7 +74,7 @@ public class BackendUtil {
throw new DorisRuntimeException("no available backend.");
}
- public boolean tryHttpConnection(String backend) {
+ public static boolean tryHttpConnection(String backend) {
try {
backend = "http://" + backend;
URL url = new URL(backend);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 2a0fba0..ffcb8af 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -94,7 +94,7 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
int retry = 0;
- while (retry++ <= maxRetry) {
+ while (retry <= maxRetry) {
//get latest-url
String url = String.format(commitPattern, hostPort,
committable.getDb());
HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
@@ -108,7 +108,7 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
Map<String, String> res =
jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
if (!res.get("status").equals(SUCCESS) &&
!ResponseUtil.isCommitted(res.get("msg"))) {
- throw new DorisRuntimeException("Commit failed " +
loadResult);
+ throw new DorisRuntimeException("commit
transaction failed " + loadResult);
} else {
LOG.info("load result {}", loadResult);
}
@@ -116,18 +116,19 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
return;
}
String reasonPhrase = statusLine.getReasonPhrase();
- LOG.warn("commit failed with {}, reason {}", hostPort,
reasonPhrase);
+ LOG.error("commit failed with {}, reason {}", hostPort,
reasonPhrase);
if (retry == maxRetry) {
- throw new DorisRuntimeException("stream load error: " +
reasonPhrase);
+ throw new DorisRuntimeException("commit transaction error:
" + reasonPhrase);
}
hostPort = backendUtil.getAvailableBackend();
- } catch (IOException e) {
- LOG.error("commit transaction failed: ", e);
+ } catch (Exception e) {
+ LOG.error("commit transaction failed, to retry, {}",
e.getMessage());
if (retry == maxRetry) {
- throw new IOException("commit transaction failed: {}", e);
+ throw new DorisRuntimeException("commit transaction error,
", e);
}
hostPort = backendUtil.getAvailableBackend();
}
+ retry++;
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index 794f806..a3178e3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -21,7 +21,8 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
+import org.apache.doris.flink.rest.models.BackendV2;
+import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
@@ -32,9 +33,10 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicStatusLine;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.mockito.MockedStatic;
-import org.slf4j.Logger;
import java.util.Collections;
@@ -52,6 +54,9 @@ public class TestDorisCommitter {
DorisCommittable dorisCommittable;
HttpEntityMock entityMock;
private MockedStatic<RestService> restServiceMockedStatic;
+ private MockedStatic<BackendUtil> backendUtilMockedStatic;
+ @Rule
+ public ExpectedException thrown= ExpectedException.none();
@Before
public void setUp() throws Exception {
@@ -63,15 +68,17 @@ public class TestDorisCommitter {
CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
StatusLine normalLine = new BasicStatusLine(new
ProtocolVersion("http", 1, 0), 200, "");
restServiceMockedStatic = mockStatic(RestService.class);
- Logger mockLogger = mock(Logger.class);
- mock(RestService.class);
+ backendUtilMockedStatic = mockStatic(BackendUtil.class);
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
- when(RestService.getBackendsV2(dorisOptions, readOptions,
mockLogger)).thenReturn(
- Collections.singletonList(new BackendRowV2()));
- dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 2,
httpClient);
+
+ restServiceMockedStatic.when(()->
RestService.getBackendsV2(any(),any(),any()))
+
.thenReturn(Collections.singletonList(BackendV2.BackendRowV2.of("127.0.0.1",
8040,true)));
+ backendUtilMockedStatic.when(()->
BackendUtil.tryHttpConnection(any())).thenReturn(true);
+
+ dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3,
httpClient);
}
@Test
@@ -83,11 +90,13 @@ public class TestDorisCommitter {
this.entityMock.setValue(response);
final MockCommitRequest<DorisCommittable> request = new
MockCommitRequest<>(dorisCommittable);
dorisCommitter.commit(Collections.singletonList(request));
-
}
- @Test(expected = DorisRuntimeException.class)
+ @Test
public void testCommitAbort() throws Exception {
+ thrown.expect(DorisRuntimeException.class);
+ thrown.expectMessage("commit transaction error");
+
String response = "{\n" +
"\"status\": \"Fail\",\n" +
"\"msg\": \"errCode = 2, detailMessage = transaction [25] is
already aborted. abort reason: User Abort\"\n" +
@@ -100,5 +109,6 @@ public class TestDorisCommitter {
@After
public void after() {
restServiceMockedStatic.close();
+ backendUtilMockedStatic.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]