This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 57003ef8 [Fix](batch) Fixed an issue where the writer might be blocked in batch mode (#506) 57003ef8 is described below commit 57003ef8d242a613a40ac3f064ba83505e78248b Author: wudi <676366...@qq.com> AuthorDate: Wed Nov 6 16:45:58 2024 +0800 [Fix](batch) Fixed an issue where the writer might be blocked in batch mode (#506) --- .../flink/sink/batch/DorisBatchStreamLoad.java | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 42b83207..3cfda604 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -198,6 +198,7 @@ public class DorisBatchStreamLoad implements Serializable { lock.lock(); try { while (currentCacheBytes.get() >= maxBlockedBytes) { + checkFlushException(); LOG.info( "Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}", currentCacheBytes.get(), @@ -486,11 +487,22 @@ public class DorisBatchStreamLoad implements Serializable { putBuilder.setLabel(label + "_" + retry); reason = respContent.getMessage(); } else { - String errMsg = - String.format( - "stream load error: %s, see more in %s", - respContent.getMessage(), - respContent.getErrorURL()); + String errMsg = null; + if (StringUtils.isBlank(respContent.getMessage()) + && StringUtils.isBlank(respContent.getErrorURL())) { + // sometimes stream load will not return message + errMsg = + String.format( + "stream load error, response is %s", + loadResult); + throw new DorisBatchLoadException(errMsg); + } else { + errMsg = + String.format( + "stream load error: %s, see more in %s", + respContent.getMessage(), + respContent.getErrorURL()); + } throw new DorisBatchLoadException(errMsg); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org