This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 57003ef8 [Fix](batch) Fixed an issue where the writer might be blocked 
in batch mode (#506)
57003ef8 is described below

commit 57003ef8d242a613a40ac3f064ba83505e78248b
Author: wudi <676366...@qq.com>
AuthorDate: Wed Nov 6 16:45:58 2024 +0800

    [Fix](batch) Fixed an issue where the writer might be blocked in batch mode 
(#506)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 22 +++++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 42b83207..3cfda604 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -198,6 +198,7 @@ public class DorisBatchStreamLoad implements Serializable {
             lock.lock();
             try {
                 while (currentCacheBytes.get() >= maxBlockedBytes) {
+                    checkFlushException();
                     LOG.info(
                             "Cache full, waiting for flush, currentBytes: {}, 
maxBlockedBytes: {}",
                             currentCacheBytes.get(),
@@ -486,11 +487,22 @@ public class DorisBatchStreamLoad implements Serializable 
{
                                 putBuilder.setLabel(label + "_" + retry);
                                 reason = respContent.getMessage();
                             } else {
-                                String errMsg =
-                                        String.format(
-                                                "stream load error: %s, see 
more in %s",
-                                                respContent.getMessage(),
-                                                respContent.getErrorURL());
+                                String errMsg = null;
+                                if 
(StringUtils.isBlank(respContent.getMessage())
+                                        && 
StringUtils.isBlank(respContent.getErrorURL())) {
+                                    // sometimes stream load will not return 
message
+                                    errMsg =
+                                            String.format(
+                                                    "stream load error, 
response is %s",
+                                                    loadResult);
+                                    throw new DorisBatchLoadException(errMsg);
+                                } else {
+                                    errMsg =
+                                            String.format(
+                                                    "stream load error: %s, 
see more in %s",
+                                                    respContent.getMessage(),
+                                                    respContent.getErrorURL());
+                                }
                                 throw new DorisBatchLoadException(errMsg);
                             }
                         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to