This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 031732a09f [INLONG-11546][SDK] Support async and sync report dirty data (#11547) 031732a09f is described below commit 031732a09f80e2f7c2c42214a247be0981b4c8ea Author: vernedeng <verned...@apache.org> AuthorDate: Tue Nov 26 20:01:32 2024 +0800 [INLONG-11546][SDK] Support async and sync report dirty data (#11547) * [INLONG-11546][SDK] Support async and sync report dirty data --- .../java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 7 +++++-- .../apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 80cc596c26..74cfcffa21 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -31,7 +31,6 @@ import java.net.InetAddress; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; @Slf4j @Builder @@ -75,7 +74,11 @@ public class InlongSdkDirtySender { } public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException { - dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS); + dirtyDataQueue.put(messageWrapper); + } + + public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) { + return dirtyDataQueue.offer(messageWrapper); } private void doSendDirtyMessage() { diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 8e692a4c10..daec1c0694 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -74,7 +74,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> { .data(dirtyMessage) .build(); - dirtySender.sendDirtyMessage(wrapper); + dirtySender.sendDirtyMessageAsync(wrapper); } catch (Throwable t) { log.error("failed to send dirty message to inlong sdk", t); if (!options.isIgnoreSideOutputErrors()) {