This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new df088ba8 [Chore] Improve some log output  (#535)
df088ba8 is described below

commit df088ba8f905c518238b8a8af7521a9ec346bedc
Author: wudi <[email protected]>
AuthorDate: Tue Jan 7 09:59:31 2025 +0800

    [Chore] Improve some log output  (#535)
---
 .../apache/doris/flink/sink/committer/DorisCommitter.java    | 12 ++++++++----
 .../org/apache/doris/flink/sink/writer/DorisStreamLoad.java  |  5 ++++-
 .../java/org/apache/doris/flink/sink/writer/DorisWriter.java |  1 -
 3 files changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index e73d96cd..b1a70059 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -128,12 +128,16 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
                                 jsonMapper.readValue(
                                         loadResult,
                                         new TypeReference<HashMap<String, 
String>>() {});
-                        if (!res.get("status").equals(SUCCESS)
-                                && !ResponseUtil.isCommitted(res.get("msg"))) {
+                        if (res.get("status").equals(SUCCESS)) {
+                            LOG.info("load result {}", loadResult);
+                        } else if (ResponseUtil.isCommitted(res.get("msg"))) {
+                            LOG.info(
+                                    "transaction {} has already committed 
successfully, skipping, load response is {}",
+                                    committable.getTxnID(),
+                                    res.get("msg"));
+                        } else {
                             throw new DorisRuntimeException(
                                     "commit transaction failed " + loadResult);
-                        } else {
-                            LOG.info("load result {}", loadResult);
                         }
                         return;
                     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 02c2df49..f900f741 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -391,7 +391,10 @@ public class DorisStreamLoad implements Serializable {
             String msg = res.get("msg");
             // transaction already aborted
             if (msg != null && ResponseUtil.isAborted(msg)) {
-                LOG.warn("Failed to abort transaction, {}", msg);
+                LOG.info(
+                        "transaction {} may have already been successfully 
aborted, skipping, abort response is {}",
+                        txnID,
+                        msg);
                 return;
             }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index fdb797f9..8e28213d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -458,7 +458,6 @@ public class DorisWriter<IN>
         if (scheduledExecutorService != null) {
             scheduledExecutorService.shutdownNow();
         }
-        LOG.info("Try to abort txn before closing.");
         abortPossibleSuccessfulTransaction();
 
         if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to