This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new df088ba8 [Chore] Improve some log output (#535)
df088ba8 is described below
commit df088ba8f905c518238b8a8af7521a9ec346bedc
Author: wudi <[email protected]>
AuthorDate: Tue Jan 7 09:59:31 2025 +0800
[Chore] Improve some log output (#535)
---
.../apache/doris/flink/sink/committer/DorisCommitter.java | 12 ++++++++----
.../org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 5 ++++-
.../java/org/apache/doris/flink/sink/writer/DorisWriter.java | 1 -
3 files changed, 12 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index e73d96cd..b1a70059 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -128,12 +128,16 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
jsonMapper.readValue(
loadResult,
new TypeReference<HashMap<String,
String>>() {});
- if (!res.get("status").equals(SUCCESS)
- && !ResponseUtil.isCommitted(res.get("msg"))) {
+ if (res.get("status").equals(SUCCESS)) {
+ LOG.info("load result {}", loadResult);
+ } else if (ResponseUtil.isCommitted(res.get("msg"))) {
+ LOG.info(
+ "transaction {} has already committed
successfully, skipping, load response is {}",
+ committable.getTxnID(),
+ res.get("msg"));
+ } else {
throw new DorisRuntimeException(
"commit transaction failed " + loadResult);
- } else {
- LOG.info("load result {}", loadResult);
}
return;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 02c2df49..f900f741 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -391,7 +391,10 @@ public class DorisStreamLoad implements Serializable {
String msg = res.get("msg");
// transaction already aborted
if (msg != null && ResponseUtil.isAborted(msg)) {
- LOG.warn("Failed to abort transaction, {}", msg);
+ LOG.info(
+ "transaction {} may have already been successfully
aborted, skipping, abort response is {}",
+ txnID,
+ msg);
return;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index fdb797f9..8e28213d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -458,7 +458,6 @@ public class DorisWriter<IN>
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
- LOG.info("Try to abort txn before closing.");
abortPossibleSuccessfulTransaction();
if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]