This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 934b081 commitTransaction method improvement (#121) 934b081 is described below commit 934b08172f388887c80161d3ca5a355611903ebd Author: benjobs <benj...@apache.org> AuthorDate: Fri Mar 10 14:58:44 2023 +0800 commitTransaction method improvement (#121) * commitTransaction method improvemen, http request failed, no exception was thrown bug fixed, minor optimize Co-authored-by: benjobs <benj...@gmail.com> --- .../doris/flink/sink/committer/DorisCommitter.java | 83 ++++++++++++---------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 316f92e..acbd310 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -29,7 +29,9 @@ import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; @@ -52,6 +54,8 @@ public class DorisCommitter implements Committer<DorisCommittable> { private final CloseableHttpClient httpClient; private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; + private final ObjectMapper jsonMapper = new ObjectMapper(); + int maxRetry; public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int maxRetry) { @@ -66,7 +70,7 @@ public class DorisCommitter implements Committer<DorisCommittable> { } @Override - public List<DorisCommittable> commit(List<DorisCommittable> committableList) throws IOException, InterruptedException { + public List<DorisCommittable> commit(List<DorisCommittable> committableList) throws IOException { for (DorisCommittable committable : committableList) { commitTransaction(committable); } @@ -74,49 +78,50 @@ public class DorisCommitter implements Committer<DorisCommittable> { } private void commitTransaction(DorisCommittable committable) throws IOException { - int statusCode = -1; - String reasonPhrase = null; - int retry = 0; + //basic params + HttpPutBuilder builder = new HttpPutBuilder() + .addCommonHeader() + .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword()) + .addTxnId(committable.getTxnID()) + .commit(); + + //hostPort String hostPort = committable.getHostPort(); - CloseableHttpResponse response = null; + + int retry = 0; while (retry++ <= maxRetry) { - HttpPutBuilder putBuilder = new HttpPutBuilder(); - putBuilder.setUrl(String.format(commitPattern, hostPort, committable.getDb())) - .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword()) - .addCommonHeader() - .addTxnId(committable.getTxnID()) - .setEmptyEntity() - .commit(); - try { - response = httpClient.execute(putBuilder.build()); + //get latest-url + String url = String.format(commitPattern, hostPort, committable.getDb()); + HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build(); + + // http execute... + try (CloseableHttpResponse response = httpClient.execute(httpPut)) { + StatusLine statusLine = response.getStatusLine(); + if (200 == statusLine.getStatusCode()) { + if (response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + Map<String, String> res = jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() { + }); + if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) { + throw new DorisRuntimeException("Commit failed " + loadResult); + } else { + LOG.info("load result {}", loadResult); + } + } + return; + } + String reasonPhrase = statusLine.getReasonPhrase(); + LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase); + if (retry == maxRetry) { + throw new DorisRuntimeException("stream load error: " + reasonPhrase); + } + hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG); } catch (IOException e) { LOG.error("commit transaction failed: ", e); + if (retry == maxRetry) { + throw new IOException("commit transaction failed: {}", e); + } hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG); - continue; - } - statusCode = response.getStatusLine().getStatusCode(); - reasonPhrase = response.getStatusLine().getReasonPhrase(); - if (statusCode != 200) { - LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase); - hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG); - } else { - break; - } - } - - if (statusCode != 200) { - throw new DorisRuntimeException("stream load error: " + reasonPhrase); - } - - ObjectMapper mapper = new ObjectMapper(); - if (response.getEntity() != null) { - String loadResult = EntityUtils.toString(response.getEntity()); - Map<String, String> res = mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() { - }); - if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) { - throw new DorisRuntimeException("Commit failed " + loadResult); - } else { - LOG.info("load result {}", loadResult); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org