This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9f06d7a improve check commit success (#210) 9f06d7a is described below commit 9f06d7a79f0bebe4bbb6004032cc018c17eb55d8 Author: wudi <676366...@qq.com> AuthorDate: Tue Oct 17 10:43:08 2023 +0800 improve check commit success (#210) --- .../org/apache/doris/flink/sink/committer/DorisCommitter.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 5bb1a40..0e19b0f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -17,11 +17,9 @@ package org.apache.doris.flink.sink.committer; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.connector.sink.Committer; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; @@ -31,6 +29,7 @@ import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.flink.api.connector.sink.Committer; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; @@ -45,7 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.doris.flink.sink.LoadStatus.FAIL; +import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; /** * The committer to commit transaction. @@ -109,7 +108,7 @@ public class DorisCommitter implements Committer<DorisCommittable> { String loadResult = EntityUtils.toString(response.getEntity()); Map<String, String> res = jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() { }); - if (res.get("status").equals(FAIL) && !ResponseUtil.isCommitted(res.get("msg"))) { + if (!res.get("status").equals(SUCCESS) && !ResponseUtil.isCommitted(res.get("msg"))) { throw new DorisRuntimeException("Commit failed " + loadResult); } else { LOG.info("load result {}", loadResult); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org