This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f06d7a  improve check commit success (#210)
9f06d7a is described below

commit 9f06d7a79f0bebe4bbb6004032cc018c17eb55d8
Author: wudi <676366...@qq.com>
AuthorDate: Tue Oct 17 10:43:08 2023 +0800

    improve check commit success (#210)
---
 .../org/apache/doris/flink/sink/committer/DorisCommitter.java    | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 5bb1a40..0e19b0f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -17,11 +17,9 @@
 
 package org.apache.doris.flink.sink.committer;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.connector.sink.Committer;
-
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -31,6 +29,7 @@ import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.ResponseUtil;
+import org.apache.flink.api.connector.sink.Committer;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPut;
@@ -45,7 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.doris.flink.sink.LoadStatus.FAIL;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 
 /**
  * The committer to commit transaction.
@@ -109,7 +108,7 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
                         String loadResult = 
EntityUtils.toString(response.getEntity());
                         Map<String, String> res = 
jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
                         });
-                        if (res.get("status").equals(FAIL) && 
!ResponseUtil.isCommitted(res.get("msg"))) {
+                        if (!res.get("status").equals(SUCCESS) && 
!ResponseUtil.isCommitted(res.get("msg"))) {
                             throw new DorisRuntimeException("Commit failed " + 
loadResult);
                         } else {
                             LOG.info("load result {}", loadResult);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to