This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 934b081  commitTransaction method improvement (#121)
934b081 is described below

commit 934b08172f388887c80161d3ca5a355611903ebd
Author: benjobs <benj...@apache.org>
AuthorDate: Fri Mar 10 14:58:44 2023 +0800

    commitTransaction method improvement (#121)
    
    * commitTransaction method improvemen, http request failed, no exception 
was thrown bug fixed, minor optimize
    Co-authored-by: benjobs <benj...@gmail.com>
---
 .../doris/flink/sink/committer/DorisCommitter.java | 83 ++++++++++++----------
 1 file changed, 44 insertions(+), 39 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 316f92e..acbd310 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -29,7 +29,9 @@ import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.ResponseUtil;
+import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
@@ -52,6 +54,8 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
     private final CloseableHttpClient httpClient;
     private final DorisOptions dorisOptions;
     private final DorisReadOptions dorisReadOptions;
+    private final ObjectMapper jsonMapper = new ObjectMapper();
+
     int maxRetry;
 
     public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions 
dorisReadOptions, int maxRetry) {
@@ -66,7 +70,7 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
     }
 
     @Override
-    public List<DorisCommittable> commit(List<DorisCommittable> 
committableList) throws IOException, InterruptedException {
+    public List<DorisCommittable> commit(List<DorisCommittable> 
committableList) throws IOException {
         for (DorisCommittable committable : committableList) {
             commitTransaction(committable);
         }
@@ -74,49 +78,50 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
     }
 
     private void commitTransaction(DorisCommittable committable) throws 
IOException {
-        int statusCode = -1;
-        String reasonPhrase = null;
-        int retry = 0;
+        //basic params
+        HttpPutBuilder builder = new HttpPutBuilder()
+                .addCommonHeader()
+                .baseAuth(dorisOptions.getUsername(), 
dorisOptions.getPassword())
+                .addTxnId(committable.getTxnID())
+                .commit();
+
+        //hostPort
         String hostPort = committable.getHostPort();
-        CloseableHttpResponse response = null;
+
+        int retry = 0;
         while (retry++ <= maxRetry) {
-            HttpPutBuilder putBuilder = new HttpPutBuilder();
-            putBuilder.setUrl(String.format(commitPattern, hostPort, 
committable.getDb()))
-                    .baseAuth(dorisOptions.getUsername(), 
dorisOptions.getPassword())
-                    .addCommonHeader()
-                    .addTxnId(committable.getTxnID())
-                    .setEmptyEntity()
-                    .commit();
-            try {
-                response = httpClient.execute(putBuilder.build());
+            //get latest-url
+            String url = String.format(commitPattern, hostPort, 
committable.getDb());
+            HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build();
+
+            // http execute...
+            try (CloseableHttpResponse response = httpClient.execute(httpPut)) 
{
+                StatusLine statusLine = response.getStatusLine();
+                if (200 == statusLine.getStatusCode()) {
+                    if (response.getEntity() != null) {
+                        String loadResult = 
EntityUtils.toString(response.getEntity());
+                        Map<String, String> res = 
jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
+                        });
+                        if (res.get("status").equals(FAIL) && 
!ResponseUtil.isCommitted(res.get("msg"))) {
+                            throw new DorisRuntimeException("Commit failed " + 
loadResult);
+                        } else {
+                            LOG.info("load result {}", loadResult);
+                        }
+                    }
+                    return;
+                }
+                String reasonPhrase = statusLine.getReasonPhrase();
+                LOG.warn("commit failed with {}, reason {}", hostPort, 
reasonPhrase);
+                if (retry == maxRetry) {
+                    throw new DorisRuntimeException("stream load error: " + 
reasonPhrase);
+                }
+                hostPort = RestService.getBackend(dorisOptions, 
dorisReadOptions, LOG);
             } catch (IOException e) {
                 LOG.error("commit transaction failed: ", e);
+                if (retry == maxRetry) {
+                    throw new IOException("commit transaction failed: {}", e);
+                }
                 hostPort = RestService.getBackend(dorisOptions, 
dorisReadOptions, LOG);
-                continue;
-            }
-            statusCode = response.getStatusLine().getStatusCode();
-            reasonPhrase = response.getStatusLine().getReasonPhrase();
-            if (statusCode != 200) {
-                LOG.warn("commit failed with {}, reason {}", hostPort, 
reasonPhrase);
-                hostPort = RestService.getBackend(dorisOptions, 
dorisReadOptions, LOG);
-            } else {
-                break;
-            }
-        }
-
-        if (statusCode != 200) {
-            throw new DorisRuntimeException("stream load error: " + 
reasonPhrase);
-        }
-
-        ObjectMapper mapper = new ObjectMapper();
-        if (response.getEntity() != null) {
-            String loadResult = EntityUtils.toString(response.getEntity());
-            Map<String, String> res = mapper.readValue(loadResult, new 
TypeReference<HashMap<String, String>>() {
-            });
-            if (res.get("status").equals(FAIL) && 
!ResponseUtil.isCommitted(res.get("msg"))) {
-                throw new DorisRuntimeException("Commit failed " + loadResult);
-            } else {
-                LOG.info("load result {}", loadResult);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to