This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 031732a09f [INLONG-11546][SDK] Support async and sync report dirty 
data (#11547)
031732a09f is described below

commit 031732a09f80e2f7c2c42214a247be0981b4c8ea
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Nov 26 20:01:32 2024 +0800

    [INLONG-11546][SDK] Support async and sync report dirty data (#11547)
    
    * [INLONG-11546][SDK] Support async and sync report dirty data
---
 .../java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 7 +++++--
 .../apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 80cc596c26..74cfcffa21 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -31,7 +31,6 @@ import java.net.InetAddress;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Builder
@@ -75,7 +74,11 @@ public class InlongSdkDirtySender {
     }
 
     public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws 
InterruptedException {
-        dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS);
+        dirtyDataQueue.put(messageWrapper);
+    }
+
+    public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) {
+        return dirtyDataQueue.offer(messageWrapper);
     }
 
     private void doSendDirtyMessage() {
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 8e692a4c10..daec1c0694 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -74,7 +74,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
                     .data(dirtyMessage)
                     .build();
 
-            dirtySender.sendDirtyMessage(wrapper);
+            dirtySender.sendDirtyMessageAsync(wrapper);
         } catch (Throwable t) {
             log.error("failed to send dirty message to inlong sdk", t);
             if (!options.isIgnoreSideOutputErrors()) {

Reply via email to