This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ab59b91be2 [INLONG-11481][Sort] Tube Connector source supports dirty 
data achieving (#11482)
ab59b91be2 is described below

commit ab59b91be2a2d84b0c0155d84a1da6bad204cc4d
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Nov 11 18:40:56 2024 +0800

    [INLONG-11481][Sort] Tube Connector source supports dirty data achieving 
(#11482)
---
 .../inlong/sdk/dirtydata/DirtyMessageWrapper.java  |  27 +++--
 ...SdkDirtySink.java => InlongSdkDirtySender.java} |  19 +++-
 inlong-sort/sort-flink/base/pom.xml                |   5 +
 .../org/apache/inlong/sort/base/Constants.java     |   2 +-
 .../apache/inlong/sort/base/dirty/DirtyData.java   |  40 ++++++-
 ...gSdkOptions.java => InlongSdkDirtyOptions.java} |   7 +-
 .../base/dirty/sink/sdk/InlongSdkDirtySink.java    | 115 ++++++++-------------
 .../dirty/sink/sdk/InlongSdkDirtySinkFactory.java  |  39 +++----
 .../sort-connectors/tubemq/pom.xml                 |   9 ++
 .../table/DynamicTubeMQDeserializationSchema.java  |   2 +-
 .../DynamicTubeMQTableDeserializationSchema.java   |  61 ++++++++++-
 .../tubemq/table/TubeMQDynamicTableFactory.java    |  37 ++++---
 .../sort/tubemq/table/TubeMQTableSource.java       |  15 ++-
 13 files changed, 243 insertions(+), 135 deletions(-)

diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
index 984c456480..977e002478 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -18,12 +18,17 @@
 package org.apache.inlong.sdk.dirtydata;
 
 import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.text.StringEscapeUtils;
 
+import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Base64;
 import java.util.StringJoiner;
 
+@Slf4j
 @Builder
 public class DirtyMessageWrapper {
 
@@ -32,16 +37,17 @@ public class DirtyMessageWrapper {
 
     private String inlongGroupId;
     private String inlongStreamId;
-    private String dataTime;
+    private long dataTime;
     private String dataflowId;
     private String serverType;
     private String dirtyType;
+    private String dirtyMessage;
     private String ext;
     private String data;
     private byte[] dataBytes;
 
     public String format() {
-        String now = LocalDateTime.now().format(dateTimeFormatter);
+        String reportTime = LocalDateTime.now().format(dateTimeFormatter);
         StringJoiner joiner = new StringJoiner(delimiter);
         String formatData = null;
         if (data != null) {
@@ -50,14 +56,19 @@ public class DirtyMessageWrapper {
             formatData = Base64.getEncoder().encodeToString(dataBytes);
         }
 
-        return joiner.add(inlongGroupId)
-                .add(inlongStreamId)
-                .add(now)
-                .add(dataTime)
+        String dataTimeStr = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime),
+                ZoneId.systemDefault()).format(dateTimeFormatter);
+        return joiner
                 .add(dataflowId)
+                .add(inlongGroupId)
+                .add(inlongStreamId)
+                .add(reportTime)
+                .add(dataTimeStr)
                 .add(serverType)
                 .add(dirtyType)
-                .add(ext)
-                .add(formatData).toString();
+                .add(StringEscapeUtils.escapeXSI(dirtyMessage))
+                .add(StringEscapeUtils.escapeXSI(ext))
+                .add(StringEscapeUtils.escapeXSI(formatData))
+                .toString();
     }
 }
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
similarity index 86%
rename from 
inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
rename to 
inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 2240ebdb6c..88e2e88a74 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.sdk.dirtydata;
 
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.MessageSender;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
@@ -28,19 +27,22 @@ import com.google.common.base.Preconditions;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
 
+import java.net.InetAddress;
+
 @Slf4j
 @Builder
-public class InlongSdkDirtySink {
+public class InlongSdkDirtySender {
 
     private String inlongGroupId;
     private String inlongStreamId;
     private String inlongManagerAddr;
+    private int inlongManagerPort;
     private String authId;
     private String authKey;
     private boolean ignoreErrors;
 
     private SendMessageCallback callback;
-    private MessageSender sender;
+    private DefaultMessageSender sender;
 
     public void init() throws Exception {
         Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be 
null");
@@ -51,8 +53,11 @@ public class InlongSdkDirtySink {
 
         this.callback = new LogCallBack();
         ProxyClientConfig proxyClientConfig =
-                new ProxyClientConfig(inlongManagerAddr, inlongGroupId, 
authId, authKey);
+                new 
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
+                        inlongManagerAddr, inlongManagerPort, inlongGroupId, 
authId, authKey);
+        proxyClientConfig.setReadProxyIPFromLocal(false);
         this.sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+        this.sender.setMsgtype(7);
         log.info("init InlongSdkDirtySink successfully, target group={}, 
stream={}", inlongGroupId, inlongStreamId);
     }
 
@@ -61,6 +66,12 @@ public class InlongSdkDirtySink {
         sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
messageWrapper.format().getBytes(), callback);
     }
 
+    public void close() {
+        if (sender != null) {
+            sender.close();
+        }
+    }
+
     class LogCallBack implements SendMessageCallback {
 
         @Override
diff --git a/inlong-sort/sort-flink/base/pom.xml 
b/inlong-sort/sort-flink/base/pom.xml
index 19c36af5fc..fd75d483d9 100644
--- a/inlong-sort/sort-flink/base/pom.xml
+++ b/inlong-sort/sort-flink/base/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>dirty-data-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.inlong</groupId>
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index c13c47f66e..7e02b0ca96 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -385,7 +385,7 @@ public final class Constants {
     public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER 
=
             ConfigOptions.key("dirty.side-output.field-delimiter")
                     .stringType()
-                    .defaultValue(",")
+                    .defaultValue("|")
                     .withDescription("The field-delimiter of dirty 
side-output");
     public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
             ConfigOptions.key("dirty.side-output.line-delimiter")
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index 542e9e1977..1ac2b60a1d 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -71,6 +71,14 @@ public class DirtyData<T> {
      * The row type of data, it is only used for 'RowData'
      */
     private @Nullable final LogicalType rowType;
+    /**
+     * Dirty message data time
+     */
+    private final long dataTime;
+    /**
+     * Dirty message ext params
+     */
+    private @Nullable final String extParams;
     /**
      * The real dirty data
      */
@@ -78,7 +86,7 @@ public class DirtyData<T> {
 
     public DirtyData(T data, String identifier, String labels,
             String logTag, DirtyType dirtyType, String dirtyMessage,
-            @Nullable LogicalType rowType) {
+            @Nullable LogicalType rowType, long dataTime, String extParams) {
         this.data = data;
         this.dirtyType = dirtyType;
         this.dirtyMessage = dirtyMessage;
@@ -87,7 +95,8 @@ public class DirtyData<T> {
         this.labels = PatternReplaceUtils.replace(labels, paramMap);
         this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
         this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
-
+        this.dataTime = dataTime == 0 ? System.currentTimeMillis() : dataTime;
+        this.extParams = extParams;
     }
 
     public static <T> Builder<T> builder() {
@@ -122,6 +131,18 @@ public class DirtyData<T> {
         return identifier;
     }
 
+    public long getDataTime() {
+        return dataTime;
+    }
+
+    public String getExtParams() {
+        return extParams;
+    }
+
+    public String getDirtyMessage() {
+        return dirtyMessage;
+    }
+
     @Nullable
     public LogicalType getRowType() {
         return rowType;
@@ -135,8 +156,20 @@ public class DirtyData<T> {
         private DirtyType dirtyType = DirtyType.UNDEFINED;
         private String dirtyMessage;
         private LogicalType rowType;
+        private long dataTime;
+        private String extParams;
         private T data;
 
+        public Builder<T> setDirtyDataTime(long dataTime) {
+            this.dataTime = dataTime;
+            return this;
+        }
+
+        public Builder<T> setExtParams(String extParams) {
+            this.extParams = extParams;
+            return this;
+        }
+
         public Builder<T> setDirtyType(DirtyType dirtyType) {
             this.dirtyType = dirtyType;
             return this;
@@ -173,7 +206,8 @@ public class DirtyData<T> {
         }
 
         public DirtyData<T> build() {
-            return new DirtyData<>(data, identifier, labels, logTag, 
dirtyType, dirtyMessage, rowType);
+            return new DirtyData<>(data, identifier, labels, logTag, dirtyType,
+                    dirtyMessage, rowType, dataTime, extParams);
         }
     }
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
similarity index 91%
rename from 
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
rename to 
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
index 0692d78580..5cb03f0f80 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
 @Data
 @Builder
 @Getter
-public class InlongSdkOptions implements Serializable {
+public class InlongSdkDirtyOptions implements Serializable {
 
     private static final String DEFAULT_FORMAT = "csv";
 
@@ -36,9 +36,10 @@ public class InlongSdkOptions implements Serializable {
     private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
     private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
 
-    private String inlongGroupId;
-    private String inlongStreamId;
+    private String sendToGroupId;
+    private String sendToStreamId;
     private String inlongManagerAddr;
+    private int inlongManagerPort;
     private String inlongManagerAuthKey;
     private String inlongManagerAuthId;
     private String format = DEFAULT_FORMAT;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 8a9d407a4b..4441cca830 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -17,13 +17,9 @@
 
 package org.apache.inlong.sort.base.dirty.sink.sdk;
 
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.MessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dirtydata.DirtyMessageWrapper;
+import org.apache.inlong.sdk.dirtydata.InlongSdkDirtySender;
 import org.apache.inlong.sort.base.dirty.DirtyData;
-import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
 import org.apache.inlong.sort.base.util.LabelUtils;
@@ -37,46 +33,54 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.Base64;
 import java.util.Map;
-import java.util.StringJoiner;
 
 @Slf4j
 public class InlongSdkDirtySink<T> implements DirtySink<T> {
 
-    private final InlongSdkOptions options;
+    private final InlongSdkDirtyOptions options;
     private final DataType physicalRowDataType;
-    private final String inlongGroupId;
-    private final String inlongStreamId;
-    private final SendMessageCallback callback;
 
-    private transient DateTimeFormatter dateTimeFormatter;
     private transient RowData.FieldGetter[] fieldGetters;
     private transient RowDataToJsonConverters.RowDataToJsonConverter converter;
-    private transient MessageSender sender;
+    private transient InlongSdkDirtySender dirtySender;
 
-    public InlongSdkDirtySink(InlongSdkOptions options, DataType 
physicalRowDataType) {
+    public InlongSdkDirtySink(InlongSdkDirtyOptions options, DataType 
physicalRowDataType) {
         this.options = options;
         this.physicalRowDataType = physicalRowDataType;
-        this.inlongGroupId = options.getInlongGroupId();
-        this.inlongStreamId = options.getInlongStreamId();
-        this.callback = new LogCallBack();
     }
 
     @Override
     public void invoke(DirtyData<T> dirtyData) throws Exception {
         try {
             Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
-            String groupId = 
Preconditions.checkNotNull(labelMap.get("groupId"));
-            String streamId = 
Preconditions.checkNotNull(labelMap.get("streamId"));
-
-            String message = join(groupId, streamId,
-                    dirtyData.getDirtyType(), dirtyData.getLabels(), 
formatData(dirtyData, labelMap));
-            sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
message.getBytes(), callback);
+            String dataGroupId = 
Preconditions.checkNotNull(labelMap.get("groupId"));
+            String dataStreamId = 
Preconditions.checkNotNull(labelMap.get("streamId"));
+            String serverType = 
Preconditions.checkNotNull(labelMap.get("serverType"));
+            String dataflowId = 
Preconditions.checkNotNull(labelMap.get("dataflowId"));
+
+            String dirtyMessage = formatData(dirtyData, labelMap);
+
+            DirtyMessageWrapper wrapper = DirtyMessageWrapper.builder()
+                    .delimiter(options.getCsvFieldDelimiter())
+                    .inlongGroupId(dataGroupId)
+                    .inlongStreamId(dataStreamId)
+                    .dataflowId(dataflowId)
+                    .dataTime(dirtyData.getDataTime())
+                    .serverType(serverType)
+                    .dirtyType(dirtyData.getDirtyType().format())
+                    .dirtyMessage(dirtyData.getDirtyMessage())
+                    .ext(dirtyData.getExtParams())
+                    .data(dirtyMessage)
+                    .build();
+
+            dirtySender.sendDirtyMessage(wrapper);
         } catch (Throwable t) {
             log.error("failed to send dirty message to inlong sdk", t);
+            if (!options.isIgnoreSideOutputErrors()) {
+                throw new RuntimeException("failed to send dirty message to 
inlong sdk", t);
+            }
         }
     }
 
@@ -84,39 +88,28 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
     public void open(Configuration configuration) throws Exception {
         converter = 
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
         fieldGetters = 
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
-        dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
+        log.info("inlong sdk dirty options={}", options);
         // init sender
-        ProxyClientConfig proxyClientConfig =
-                new ProxyClientConfig(options.getInlongManagerAddr(), 
options.getInlongGroupId(),
-                        options.getInlongManagerAuthId(), 
options.getInlongManagerAuthKey());
-        sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+        dirtySender = InlongSdkDirtySender.builder()
+                .inlongManagerAddr(options.getInlongManagerAddr())
+                .inlongManagerPort(options.getInlongManagerPort())
+                .authId(options.getInlongManagerAuthId())
+                .authKey(options.getInlongManagerAuthKey())
+                .ignoreErrors(options.isIgnoreSideOutputErrors())
+                .inlongGroupId(options.getSendToGroupId())
+                .inlongStreamId(options.getSendToStreamId())
+                .build();
+        dirtySender.init();
     }
 
     @Override
     public void close() throws Exception {
-        if (sender != null) {
-            sender.close();
+        if (dirtySender != null) {
+            dirtySender.close();
         }
     }
 
-    private String join(
-            String inlongGroup,
-            String inlongStream,
-            DirtyType type,
-            String label,
-            String formattedData) {
-
-        String now = LocalDateTime.now().format(dateTimeFormatter);
-
-        StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
-        return joiner.add(inlongGroup + "." + inlongStream)
-                .add(now)
-                .add(type.name())
-                .add(label)
-                .add(formattedData).toString();
-    }
-
     private String formatData(DirtyData<T> dirtyData, Map<String, String> 
labels) throws JsonProcessingException {
         String value;
         T data = dirtyData.getData();
@@ -158,28 +151,4 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> 
{
         }
         return value;
     }
-
-    class LogCallBack implements SendMessageCallback {
-
-        @Override
-        public void onMessageAck(SendResult result) {
-            if (result == SendResult.OK) {
-                return;
-            }
-            log.error("failed to send inlong dirty message, response={}", 
result);
-
-            if (!options.isIgnoreSideOutputErrors()) {
-                throw new RuntimeException("writing dirty message to inlong 
sdk failed, response=" + result);
-            }
-        }
-
-        @Override
-        public void onException(Throwable e) {
-            log.error("failed to send inlong dirty message", e);
-
-            if (!options.isIgnoreSideOutputErrors()) {
-                throw new RuntimeException("writing dirty message to inlong 
sdk failed", e);
-            }
-        }
-    }
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
index 000836b667..8d7e399c5c 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -20,32 +20,39 @@ package org.apache.inlong.sort.base.dirty.sink.sdk;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
 
+@Slf4j
 public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
 
     private static final String IDENTIFIER = "inlong-sdk";
 
-    private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_MANAGER 
=
+    private static final ConfigOption<String> 
DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR =
             
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The inlong manager addr to init inlong 
sdk");
 
+    private static final ConfigOption<Integer> 
DIRTY_SIDE_OUTPUT_INLONG_MANAGER_PORT =
+            
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-port")
+                    .intType()
+                    .defaultValue(8083)
+                    .withDescription("The inlong manager port to init inlong 
sdk");
+
     private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID 
=
             ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id")
                     .stringType()
@@ -74,24 +81,18 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
     public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context 
context) {
         ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
         FactoryUtil.validateFactoryOptions(this, config);
-        validate(config);
-        return new InlongSdkDirtySink<>(getOptions(config),
+        InlongSdkDirtyOptions options = getOptions(config);
+        return new InlongSdkDirtySink<>(options,
                 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
     }
 
-    private void validate(ReadableConfig config) {
-        String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
-        if (identifier == null || identifier.trim().isEmpty()) {
-            throw new ValidationException(
-                    "The option 'dirty.identifier' is not allowed to be 
empty.");
-        }
-    }
-
-    private InlongSdkOptions getOptions(ReadableConfig config) {
-        return InlongSdkOptions.builder()
-                
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER))
-                .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
-                .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+    private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
+        return InlongSdkDirtyOptions.builder()
+                
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR))
+                
.inlongManagerPort(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_PORT))
+                .sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
+                .sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+                
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
                 
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
                 
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
                 
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
@@ -107,7 +108,7 @@ public class InlongSdkDirtySinkFactory implements 
DirtySinkFactory {
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER);
+        options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR);
         options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID);
         options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY);
         options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index f863d1cc30..b066347baa 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -62,6 +62,7 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-common</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
     </dependencies>
@@ -86,6 +87,10 @@
                                 <includes>
                                     <include>org.apache.inlong:*</include>
                                     <include>com.fasterxml.*:*</include>
+                                    <include>com.google.protobuf:*</include>
+                                    <include>org.apache.commons:*</include>
+                                    <include>commons-collections:*</include>
+                                    <include>commons-codec:*</include>
                                 </includes>
                             </artifactSet>
 
@@ -111,6 +116,10 @@
                                     
<pattern>org.apache.inlong.sort.base</pattern>
                                     
<shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.inlong.sort.base</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons</pattern>
+                                    
<shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.commons</shadedPattern>
+                                </relocation>
                             </relocations>
                         </configuration>
                     </execution>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index 80532a2dc1..822396c9cb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
 public interface DynamicTubeMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
 
     @PublicEvolving
-    default void open() {
+    default void open() throws Exception {
     }
 
     /**
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index 5d4a3bd2c6..a178aa57a8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -17,26 +17,36 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
 import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 import org.apache.inlong.tubemq.corebase.Message;
 
 import com.google.common.base.Objects;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class DynamicTubeMQTableDeserializationSchema implements 
DynamicTubeMQDeserializationSchema<RowData> {
 
     /**
@@ -65,26 +75,37 @@ public class DynamicTubeMQTableDeserializationSchema 
implements DynamicTubeMQDes
 
     private final MetricOption metricOption;
 
+    private final DirtySink<byte[]> dirtySink;
+
+    private final DirtyOptions dirtyOptions;
+
     public DynamicTubeMQTableDeserializationSchema(
             DeserializationSchema<RowData> schema,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean ignoreErrors,
             boolean innerFormat,
-            MetricOption metricOption) {
+            MetricOption metricOption,
+            DirtySink<byte[]> dirtySink,
+            DirtyOptions dirtyOptions) {
         this.deserializationSchema = schema;
         this.metadataConverters = metadataConverters;
         this.producedTypeInfo = producedTypeInfo;
         this.ignoreErrors = ignoreErrors;
         this.innerFormat = innerFormat;
         this.metricOption = metricOption;
+        this.dirtySink = dirtySink;
+        this.dirtyOptions = dirtyOptions;
     }
 
     @Override
-    public void open() {
+    public void open() throws Exception {
         if (metricOption != null) {
             sourceExactlyMetric = new SourceExactlyMetric(metricOption);
         }
+        if (dirtySink != null) {
+            dirtySink.open(new Configuration());
+        }
     }
 
     @Override
@@ -103,7 +124,41 @@ public class DynamicTubeMQTableDeserializationSchema 
implements DynamicTubeMQDes
         if (!innerFormat) {
             metricsCollector.resetTimestamp(System.currentTimeMillis());
         }
-        deserializationSchema.deserialize(message.getData(), metricsCollector);
+
+        if (!dirtyOptions.ignoreDirty()) {
+            deserializationSchema.deserialize(message.getData(), 
metricsCollector);
+        } else {
+            try {
+                deserializationSchema.deserialize(message.getData(), 
metricsCollector);
+            } catch (Throwable t) {
+                if (dirtySink != null) {
+                    DirtyData.Builder<byte[]> builder = DirtyData.builder();
+                    try {
+
+                        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+                        long dataTime = 
LocalDateTime.parse(message.getMsgTime(), formatter)
+                                .atZone(ZoneId.systemDefault())
+                                .toInstant()
+                                .toEpochMilli();
+
+                        builder.setData(message.getData())
+                                .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
+                                .setDirtyDataTime(dataTime)
+                                .setExtParams(message.getAttribute())
+                                .setLabels(dirtyOptions.getLabels())
+                                .setLogTag(dirtyOptions.getLogTag())
+                                .setDirtyMessage(t.getMessage())
+                                .setIdentifier(dirtyOptions.getIdentifier());
+                        dirtySink.invoke(builder.build());
+                    } catch (Exception ex) {
+                        if (!dirtyOptions.ignoreSideOutputErrors()) {
+                            throw new IOException(ex);
+                        }
+                        log.warn("Dirty sink failed", ex);
+                    }
+                }
+            }
+        }
 
         rows.forEach(row -> emitRow(message, (GenericRowData) row, out));
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 4962364a9c..22c3306064 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -48,22 +51,13 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
-import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
-import static 
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
+import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.*;
 
 /**
  * A dynamic table factory implementation for TubeMQ.
  */
+
 public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
     public static final String IDENTIFIER = "tubemq-inlong";
@@ -120,10 +114,10 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat = getValueDecodingFormat(helper);
 
         // validate all options
-        helper.validateExcept(ExtractNode.INLONG_MSG);
+        helper.validateExcept(ExtractNode.INLONG_MSG, PROPERTIES_PREFIX, 
DIRTY_PREFIX);
 
         validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
-        innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));
+        innerFormat = 
tableOptions.get(FORMAT).contains(ExtractNode.INLONG_MSG);
 
         final Configuration properties = 
getTubeMQProperties(context.getCatalogTable().getOptions());
 
@@ -133,6 +127,9 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
         String auditKeys = tableOptions.get(AUDIT_KEYS);
 
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
+        final DirtySink<byte[]> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+
         return createTubeMQTableSource(
                 physicalDataType,
                 valueDecodingFormat,
@@ -144,7 +141,9 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
                 properties,
                 inlongMetric,
                 auditHostAndPorts,
-                auditKeys);
+                auditKeys,
+                dirtySink,
+                dirtyOptions);
     }
 
     @Override
@@ -184,7 +183,9 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
             Configuration properties,
             String inlongMetric,
             String auditHostAndPorts,
-            String auditKeys) {
+            String auditKeys,
+            DirtySink<byte[]> dirtySink,
+            DirtyOptions dirtyOptions) {
         return new TubeMQTableSource(
                 physicalDataType,
                 valueDecodingFormat,
@@ -200,7 +201,9 @@ public class TubeMQDynamicTableFactory implements 
DynamicTableSourceFactory, Dyn
                 innerFormat,
                 inlongMetric,
                 auditHostAndPorts,
-                auditKeys);
+                auditKeys,
+                dirtySink,
+                dirtyOptions);
     }
 
     protected TubeMQTableSink createTubeMQTableSink(
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index 2d5ddbb3d5..d69fcf985e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
@@ -131,6 +133,9 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
     private String auditHostAndPorts;
     private String auditKeys;
 
+    private DirtySink<byte[]> dirtySink;
+    private DirtyOptions dirtyOptions;
+
     /**
      * Watermark strategy that is used to generate per-partition watermark.
      */
@@ -143,7 +148,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
             TreeSet<String> streamIdSet, String consumerGroup, String 
sessionKey,
             Configuration configuration, @Nullable WatermarkStrategy<RowData> 
watermarkStrategy,
             Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean 
innerFormat,
-            String inlongMetric, String auditHostAndPorts, String auditKeys) {
+            String inlongMetric, String auditHostAndPorts, String auditKeys,
+            DirtySink<byte[]> dirtySink, DirtyOptions dirtyOptions) {
 
         Preconditions.checkNotNull(physicalDataType, "Physical data type must 
not be null.");
         Preconditions.checkNotNull(valueDecodingFormat, "The deserialization 
schema must not be null.");
@@ -170,6 +176,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         this.auditKeys = auditKeys;
+        this.dirtySink = dirtySink;
+        this.dirtyOptions = dirtyOptions;
     }
 
     @Override
@@ -200,7 +208,7 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
                 physicalDataType, valueDecodingFormat, masterAddress,
                 topic, streamIdSet, consumerGroup, sessionKey, configuration,
                 watermarkStrategy, proctimeAttribute, ignoreErrors, 
innerFormat,
-                inlongMetric, auditHostAndPorts, auditKeys);
+                inlongMetric, auditHostAndPorts, auditKeys, dirtySink, 
dirtyOptions);
     }
 
     @Override
@@ -325,7 +333,8 @@ public class TubeMQTableSource implements ScanTableSource, 
SupportsReadingMetada
 
         final DynamicTubeMQDeserializationSchema<RowData> tubeMQDeserializer =
                 new DynamicTubeMQTableDeserializationSchema(
-                        deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors, innerFormat, metricOption);
+                        deserialization, metadataConverters, producedTypeInfo, 
ignoreErrors,
+                        innerFormat, metricOption, dirtySink, dirtyOptions);
 
         final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new 
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
                 consumerGroup, tubeMQDeserializer, configuration, sessionKey, 
innerFormat);


Reply via email to