This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f493862b58 [INLONG-11493][SDK] Inlong SDK Dirty Sink supports retry 
sending (#11498)
f493862b58 is described below

commit f493862b5825dffe297376d94538908400158a63
Author: vernedeng <verned...@apache.org>
AuthorDate: Thu Nov 14 12:19:59 2024 +0800

    [INLONG-11493][SDK] Inlong SDK Dirty Sink supports retry sending (#11498)
---
 .../inlong/sdk/dirtydata/DirtyMessageWrapper.java  |  8 +++
 .../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 71 +++++++++++++++++-----
 .../apache/inlong/sort/base/dirty/DirtyData.java   | 18 +++++-
 .../sort/base/dirty/sink/DirtyServerType.java      | 37 +++++++++++
 .../base/dirty/sink/sdk/InlongSdkDirtyOptions.java |  2 +
 .../base/dirty/sink/sdk/InlongSdkDirtySink.java    |  5 +-
 .../dirty/sink/sdk/InlongSdkDirtySinkFactory.java  | 13 +++-
 .../DynamicTubeMQTableDeserializationSchema.java   |  2 +
 8 files changed, 136 insertions(+), 20 deletions(-)

diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
index 977e002478..a82d574cac 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.dirtydata;
 
 import lombok.Builder;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.text.StringEscapeUtils;
 
@@ -34,6 +35,9 @@ public class DirtyMessageWrapper {
 
     private static DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     private String delimiter;
+    @Builder.Default
+    @Getter
+    private int retryTimes = 0;
 
     private String inlongGroupId;
     private String inlongStreamId;
@@ -71,4 +75,8 @@ public class DirtyMessageWrapper {
                 .add(StringEscapeUtils.escapeXSI(formatData))
                 .toString();
     }
+
+    public void increaseRetry() {
+        retryTimes++;
+    }
 }
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 88e2e88a74..80cc596c26 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -21,13 +21,17 @@ import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
 
 import com.google.common.base.Preconditions;
 import lombok.Builder;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Builder
@@ -40,9 +44,14 @@ public class InlongSdkDirtySender {
     private String authId;
     private String authKey;
     private boolean ignoreErrors;
+    private int maxRetryTimes;
+    private int maxCallbackSize;
+    @Builder.Default
+    private boolean closed = false;
 
-    private SendMessageCallback callback;
+    private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
     private DefaultMessageSender sender;
+    private Executor executor;
 
     public void init() throws Exception {
         Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be 
null");
@@ -51,45 +60,79 @@ public class InlongSdkDirtySender {
         Preconditions.checkNotNull(authId, "authId cannot be null");
         Preconditions.checkNotNull(authKey, "authKey cannot be null");
 
-        this.callback = new LogCallBack();
         ProxyClientConfig proxyClientConfig =
                 new 
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
                         inlongManagerAddr, inlongManagerPort, inlongGroupId, 
authId, authKey);
         proxyClientConfig.setReadProxyIPFromLocal(false);
+        proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);
         this.sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
         this.sender.setMsgtype(7);
+
+        this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize);
+        this.executor = Executors.newSingleThreadExecutor();
+        executor.execute(this::doSendDirtyMessage);
         log.info("init InlongSdkDirtySink successfully, target group={}, 
stream={}", inlongGroupId, inlongStreamId);
     }
 
-    public void sendDirtyMessage(DirtyMessageWrapper messageWrapper)
-            throws ProxysdkException {
-        sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
messageWrapper.format().getBytes(), callback);
+    public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws 
InterruptedException {
+        dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS);
+    }
+
+    private void doSendDirtyMessage() {
+        while (!closed) {
+            try {
+                DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll();
+                if (messageWrapper == null) {
+                    Thread.sleep(100L);
+                    continue;
+                }
+                messageWrapper.increaseRetry();
+                if (messageWrapper.getRetryTimes() > maxRetryTimes) {
+                    log.error("failed to send dirty message after {} times, 
dirty data ={}", maxRetryTimes,
+                            messageWrapper);
+                    continue;
+                }
+
+                sender.asyncSendMessage(inlongGroupId, inlongStreamId,
+                        messageWrapper.format().getBytes(), new 
LogCallBack(messageWrapper));
+
+            } catch (Throwable t) {
+                log.error("failed to send inlong dirty message", t);
+                if (!ignoreErrors) {
+                    throw new RuntimeException("writing dirty message to 
inlong sdk failed", t);
+                }
+            }
+
+        }
     }
 
     public void close() {
+        closed = true;
+        dirtyDataQueue.clear();
         if (sender != null) {
             sender.close();
         }
     }
 
+    @Getter
     class LogCallBack implements SendMessageCallback {
 
+        private final DirtyMessageWrapper wrapper;
+
+        public LogCallBack(DirtyMessageWrapper wrapper) {
+            this.wrapper = wrapper;
+        }
+
         @Override
         public void onMessageAck(SendResult result) {
-            if (result == SendResult.OK) {
-                return;
-            }
-            log.error("failed to send inlong dirty message, response={}", 
result);
-
-            if (!ignoreErrors) {
-                throw new RuntimeException("writing dirty message to inlong 
sdk failed, response=" + result);
+            if (SendResult.OK != result) {
+                dirtyDataQueue.offer(wrapper);
             }
         }
 
         @Override
         public void onException(Throwable e) {
             log.error("failed to send inlong dirty message", e);
-
             if (!ignoreErrors) {
                 throw new RuntimeException("writing dirty message to inlong 
sdk failed", e);
             }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index 1ac2b60a1d..24c5dddecd 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.base.dirty;
 
+import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
 import org.apache.inlong.sort.base.util.PatternReplaceUtils;
 
 import org.apache.flink.table.types.logical.LogicalType;
@@ -63,6 +64,8 @@ public class DirtyData<T> {
      * Dirty type
      */
     private final DirtyType dirtyType;
+
+    private final DirtyServerType serverType;
     /**
      * Dirty describe message, it is the cause of dirty data
      */
@@ -85,10 +88,11 @@ public class DirtyData<T> {
     private final T data;
 
     public DirtyData(T data, String identifier, String labels,
-            String logTag, DirtyType dirtyType, String dirtyMessage,
+            String logTag, DirtyType dirtyType, DirtyServerType serverType, 
String dirtyMessage,
             @Nullable LogicalType rowType, long dataTime, String extParams) {
         this.data = data;
         this.dirtyType = dirtyType;
+        this.serverType = serverType;
         this.dirtyMessage = dirtyMessage;
         this.rowType = rowType;
         Map<String, String> paramMap = genParamMap();
@@ -127,6 +131,10 @@ public class DirtyData<T> {
         return dirtyType;
     }
 
+    public DirtyServerType getServerType() {
+        return serverType;
+    }
+
     public String getIdentifier() {
         return identifier;
     }
@@ -154,6 +162,7 @@ public class DirtyData<T> {
         private String labels;
         private String logTag;
         private DirtyType dirtyType = DirtyType.UNDEFINED;
+        private DirtyServerType serverType = DirtyServerType.UNDEFINED;
         private String dirtyMessage;
         private LogicalType rowType;
         private long dataTime;
@@ -175,6 +184,11 @@ public class DirtyData<T> {
             return this;
         }
 
+        public Builder<T> setServerType(DirtyServerType serverType) {
+            this.serverType = serverType;
+            return this;
+        }
+
         public Builder<T> setLabels(String labels) {
             this.labels = labels;
             return this;
@@ -206,7 +220,7 @@ public class DirtyData<T> {
         }
 
         public DirtyData<T> build() {
-            return new DirtyData<>(data, identifier, labels, logTag, dirtyType,
+            return new DirtyData<>(data, identifier, labels, logTag, 
dirtyType, serverType,
                     dirtyMessage, rowType, dataTime, extParams);
         }
     }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
new file mode 100644
index 0000000000..63f993c146
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink;
+
+public enum DirtyServerType {
+
+    UNDEFINED("Undefined"),
+    TUBE_MQ("TubeMQ"),
+    ICEBERG("Iceberg")
+
+    ;
+
+    private final String format;
+
+    DirtyServerType(String format) {
+        this.format = format;
+    }
+
+    public String format() {
+        return format;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
index 5cb03f0f80..84581dd4ce 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
@@ -49,4 +49,6 @@ public class InlongSdkDirtyOptions implements Serializable {
     private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
     private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
     private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
+    private int retryTimes;
+    private int maxCallbackSize;
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 4441cca830..8513f841bc 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -57,7 +57,6 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
             Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
             String dataGroupId = 
Preconditions.checkNotNull(labelMap.get("groupId"));
             String dataStreamId = 
Preconditions.checkNotNull(labelMap.get("streamId"));
-            String serverType = 
Preconditions.checkNotNull(labelMap.get("serverType"));
             String dataflowId = 
Preconditions.checkNotNull(labelMap.get("dataflowId"));
 
             String dirtyMessage = formatData(dirtyData, labelMap);
@@ -68,7 +67,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
                     .inlongStreamId(dataStreamId)
                     .dataflowId(dataflowId)
                     .dataTime(dirtyData.getDataTime())
-                    .serverType(serverType)
+                    .serverType(dirtyData.getServerType().format())
                     .dirtyType(dirtyData.getDirtyType().format())
                     .dirtyMessage(dirtyData.getDirtyMessage())
                     .ext(dirtyData.getExtParams())
@@ -99,6 +98,8 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
                 .ignoreErrors(options.isIgnoreSideOutputErrors())
                 .inlongGroupId(options.getSendToGroupId())
                 .inlongStreamId(options.getSendToStreamId())
+                .maxRetryTimes(options.getRetryTimes())
+                .maxCallbackSize(options.getMaxCallbackSize())
                 .build();
         dirtySender.init();
     }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
index 8d7e399c5c..053aa58364 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -35,6 +35,7 @@ import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELI
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
 
 @Slf4j
 public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
@@ -77,6 +78,12 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
                     .noDefaultValue()
                     .withDescription("The inlong stream id of dirty sink");
 
+    private static final ConfigOption<Integer> 
DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE =
+            ConfigOptions.key("dirty.side-output.inlong-sdk.max-callback-size")
+                    .intType()
+                    .defaultValue(100000)
+                    .withDescription("The inlong stream id of dirty sink");
+
     @Override
     public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context 
context) {
         ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
@@ -95,8 +102,10 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
                 
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
                 
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
                 
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
-                
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
-                .enableDirtyLog(true)
+                
.ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
+                .retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
+                
.maxCallbackSize(config.get(DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE))
+                .enableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE))
                 .build();
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index a178aa57a8..94631e7cd3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tubemq.table;
 import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
@@ -143,6 +144,7 @@ public class DynamicTubeMQTableDeserializationSchema 
implements DynamicTubeMQDes
 
                         builder.setData(message.getData())
                                 .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
+                                .setServerType(DirtyServerType.TUBE_MQ)
                                 .setDirtyDataTime(dataTime)
                                 .setExtParams(message.getAttribute())
                                 .setLabels(dirtyOptions.getLabels())

Reply via email to