This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fd80e0373b [INLONG-11426][SDK] Optimize dirty data sdk (#11427)
fd80e0373b is described below

commit fd80e0373be8e38e102db973691e81fd1efc41eb
Author: vernedeng <verned...@apache.org>
AuthorDate: Mon Oct 28 20:55:33 2024 +0800

    [INLONG-11426][SDK] Optimize dirty data sdk (#11427)
    
    * [INLONG-11426][SDK] Optimize dirty data sdk
    
    * [INLONG-11426][SDK] Optimize dirty data sdk
---
 .../org/apache/inlong/sdk/dirtydata/Constants.java |  56 ------
 .../org/apache/inlong/sdk/dirtydata/DirtyData.java | 149 --------------
 .../inlong/sdk/dirtydata/DirtyDataCollector.java   | 219 ---------------------
 .../inlong/sdk/dirtydata/DirtyMessageWrapper.java  |  63 ++++++
 .../apache/inlong/sdk/dirtydata/DirtyOptions.java  |  93 ---------
 .../org/apache/inlong/sdk/dirtydata/DirtySink.java |  57 ------
 .../inlong/sdk/dirtydata/InlongSdkDirtySink.java   |  87 ++++++++
 .../inlong/sdk/dirtydata/PatternReplaceUtils.java  |  46 -----
 .../inlong/sdk/dirtydata/sink/Configure.java       |  51 -----
 .../sdk/dirtydata/sink/InlongSdkDirtySink.java     | 154 ---------------
 .../sdk/dirtydata/sink/InlongSdkOptions.java       |  51 -----
 .../inlong/sdk/dirtydata/sink/LabelUtils.java      |  67 -------
 12 files changed, 150 insertions(+), 943 deletions(-)

diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
deleted file mode 100644
index 933f81a67b..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-/**
- * Connector base option constant
- */
-public final class Constants {
-
-    public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable";
-
-    public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = 
"dirty.side-output.connector";
-
-    public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS = 
"dirty.side-output.ignore-errors";
-
-    /**
-     * The labels of dirty side-output, format is 'key1=value1&key2=value2'
-     * it supports variable replace like '${variable}'
-     * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
-     * are currently supported,
-     * and the support of other variables is determined by the connector.
-     */
-    public static final String DIRTY_SIDE_OUTPUT_LABELS = 
"dirty.side-output.labels";
-
-    /**
-     * The log tag of dirty side-output, it supports variable replace like 
'${variable}'.
-     * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] 
are currently supported,
-     * and the support of other variables is determined by the connector.
-     */
-    public static final String DIRTY_SIDE_OUTPUT_LOG_TAG = 
"dirty.side-output.log-tag";
-
-    /**
-     * It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
-     */
-    public static final String DELIMITER = "&";
-
-    /**
-     * The delimiter of key and value, it is used for 'inlong.metric.labels' 
or 'sink.dirty.labels'
-     */
-    public static final String KEY_VALUE_DELIMITER = "=";
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
deleted file mode 100644
index 93caf8b57e..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Dirty data base class, it is a wrapper of dirty data
- */
-public class DirtyData {
-
-    private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
-
-    private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
-    private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
-
-    private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
-    /**
-     * The identifier of dirty data, it will be used for filename generation 
of file dirty sink,
-     * topic generation of mq dirty sink, tablename generation of database, 
etc,
-     * and it supports variable replace like '${variable}'.
-     * There are several system 
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
-     * and the support of other variables is determined by the connector.
-     */
-    private final String identifier;
-    /**
-     * The labels of the dirty data, it will be written to store system of 
dirty
-     */
-    private final String labels;
-    /**
-     * The log tag of dirty data, it is only used to format log as follows:
-     * [${logTag}] ${labels} ${data}
-     */
-    private final String logTag;
-    /**
-     * Dirty type
-     */
-    private final String dirtyType;
-    /**
-     * Dirty describe message, it is the cause of dirty data
-     */
-    private final String dirtyMessage;
-    /**
-     * The real dirty data
-     */
-    private final byte[] data;
-
-    public DirtyData(byte[] data, String identifier, String labels,
-            String logTag, String dirtyType, String dirtyMessage) {
-        this.data = data;
-        this.dirtyType = dirtyType;
-        this.dirtyMessage = dirtyMessage;
-        Map<String, String> paramMap = genParamMap();
-        this.labels = PatternReplaceUtils.replace(labels, paramMap);
-        this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
-        this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
-
-    }
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    private Map<String, String> genParamMap() {
-        Map<String, String> paramMap = new HashMap<>();
-        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
-        paramMap.put(DIRTY_TYPE_KEY, dirtyType);
-        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
-        return paramMap;
-    }
-
-    public String getLabels() {
-        return labels;
-    }
-
-    public String getLogTag() {
-        return logTag;
-    }
-
-    public byte[] getData() {
-        return data;
-    }
-
-    public String getDirtyType() {
-        return dirtyType;
-    }
-
-    public String getIdentifier() {
-        return identifier;
-    }
-
-    public static class Builder {
-
-        private String identifier;
-        private String labels;
-        private String logTag;
-        private String dirtyType = "UNDEFINED";
-        private String dirtyMessage;
-        private byte[] data;
-
-        public Builder setDirtyType(String dirtyType) {
-            this.dirtyType = dirtyType;
-            return this;
-        }
-
-        public Builder setLabels(String labels) {
-            this.labels = labels;
-            return this;
-        }
-
-        public Builder setData(byte[] data) {
-            this.data = data;
-            return this;
-        }
-
-        public Builder setLogTag(String logTag) {
-            this.logTag = logTag;
-            return this;
-        }
-
-        public Builder setDirtyMessage(String dirtyMessage) {
-            this.dirtyMessage = dirtyMessage;
-            return this;
-        }
-
-        public DirtyData build() {
-            return new DirtyData(data, identifier, labels, logTag, dirtyType, 
dirtyMessage);
-        }
-    }
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
deleted file mode 100644
index bd8afffe62..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import org.apache.inlong.sdk.dirtydata.DirtyData.Builder;
-import org.apache.inlong.sdk.dirtydata.sink.Configure;
-import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
- */
-public class DirtyDataCollector implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DirtyDataCollector.class);
-    static final Pattern REGEX_PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
-    private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
-    private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
-    private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
-    private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    private DirtyOptions dirtyOptions;
-    private DirtySink dirtySink;
-    private Configure config;
-
-    public DirtyDataCollector() {
-    }
-
-    /**
-     * Open for dirty sink
-     *
-     * @param configuration The configuration that is used for dirty sink
-     */
-    public void open(Configure configuration) {
-        config = configuration;
-        if (dirtyOptions == null) {
-            dirtyOptions = DirtyOptions.fromConfig(configuration);
-        }
-        dirtyOptions.validate();
-        if (!dirtyOptions.isEnableDirtyCollect()) {
-            return;
-        }
-        dirtySink = createDirtySink(dirtyOptions.getSinkType());
-        try {
-            dirtySink.open(configuration);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-    }
-
-    private DirtySink createDirtySink(String sinkType) {
-        DirtySink sink;
-        try {
-            switch (sinkType) {
-                case "inlong": {
-                    sink = new InlongSdkDirtySink();
-                    sink.open(config);
-                    return sink;
-                }
-                default: {
-                    LOGGER.error("invalid dirty sink type {}", sinkType);
-                    return null;
-                }
-            }
-        } catch (Exception e) {
-            LOGGER.error("create dirty sink error", e);
-        }
-        return null;
-    }
-
-    /**
-     * Dirty data sink
-     *
-     * @param data The dirty data
-     * @param dirtyType The dirty type
-     * @param e The cause of dirty data
-     */
-    public void invoke(byte[] data, String dirtyType, Throwable e) {
-        invoke(data, dirtyType, dirtyOptions.getLabels(), 
dirtyOptions.getLogTag(), e);
-    }
-
-    /**
-     * Dirty data sink
-     *
-     * @param data The dirty data
-     * @param dirtyType The dirty type
-     * @param label The dirty label
-     * @param logTag The dirty logTag
-     * @param e The cause of dirty data
-     */
-    public void invoke(byte[] data, String dirtyType, String label, String 
logTag, Throwable e) {
-        if (!dirtyOptions.isEnableDirtyCollect()) {
-            return;
-        }
-        if (dirtySink != null) {
-            Builder builder = DirtyData.builder();
-            try {
-                builder.setData(data)
-                        .setDirtyType(dirtyType)
-                        .setLabels(label)
-                        .setLogTag(logTag)
-                        .setDirtyMessage(e.getMessage());
-                dirtySink.invoke(builder.build());
-            } catch (Exception ex) {
-                if (!dirtyOptions.isIgnoreSideOutputErrors()) {
-                    throw new RuntimeException(ex);
-                }
-                LOGGER.warn("Dirty sink failed", ex);
-            }
-        }
-    }
-
-    /**
-     * replace ${SYSTEM_TIME} with real time
-     *
-     * @param pattern
-     * @return
-     */
-    public static String regexReplace(String pattern, String dirtyType, String 
dirtyMessage) {
-        if (pattern == null) {
-            return null;
-        }
-
-        Map<String, String> paramMap = new HashMap<>(6);
-        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
-        paramMap.put(DIRTY_TYPE_KEY, dirtyType);
-        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
-
-        Matcher matcher = REGEX_PATTERN.matcher(pattern);
-        StringBuffer sb = new StringBuffer();
-        while (matcher.find()) {
-            String keyText = matcher.group(1);
-            String replacement = paramMap.get(keyText);
-            if (replacement == null) {
-                continue;
-            }
-            matcher.appendReplacement(sb, replacement);
-        }
-        matcher.appendTail(sb);
-        return sb.toString();
-    }
-
-    /**
-     * replace ${database} ${table} etc. Used in cases where 
jsonDynamicFormat.parse() is not usable.
-     */
-    public static String regexReplace(String pattern, String dirtyType, String 
dirtyMessage, String database,
-            String table, String schema) {
-        if (pattern == null) {
-            return null;
-        }
-
-        Map<String, String> paramMap = new HashMap<>(6);
-        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
-        paramMap.put(DIRTY_TYPE_KEY, dirtyType);
-        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
-        paramMap.put("source.database", database);
-        paramMap.put("database", database);
-        paramMap.put("source.table", table);
-        paramMap.put("table", table);
-        if (schema != null) {
-            paramMap.put("source.schema", schema);
-            paramMap.put("schema", schema);
-        }
-
-        Matcher matcher = REGEX_PATTERN.matcher(pattern);
-        StringBuffer sb = new StringBuffer();
-        while (matcher.find()) {
-            String keyText = matcher.group(1);
-            String replacement = paramMap.get(keyText);
-            if (replacement == null) {
-                continue;
-            }
-            matcher.appendReplacement(sb, replacement);
-        }
-        matcher.appendTail(sb);
-        return sb.toString();
-    }
-
-    public void setDirtyOptions(DirtyOptions dirtyOptions) {
-        this.dirtyOptions = dirtyOptions;
-    }
-
-    public DirtyOptions getDirtyOptions() {
-        return dirtyOptions;
-    }
-
-    @Nullable
-    public DirtySink getDirtySink() {
-        return dirtySink;
-    }
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
new file mode 100644
index 0000000000..984c456480
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import lombok.Builder;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Base64;
+import java.util.StringJoiner;
+
+@Builder
+public class DirtyMessageWrapper {
+
+    private static DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private String delimiter;
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String dataTime;
+    private String dataflowId;
+    private String serverType;
+    private String dirtyType;
+    private String ext;
+    private String data;
+    private byte[] dataBytes;
+
+    public String format() {
+        String now = LocalDateTime.now().format(dateTimeFormatter);
+        StringJoiner joiner = new StringJoiner(delimiter);
+        String formatData = null;
+        if (data != null) {
+            formatData = data;
+        } else if (dataBytes != null) {
+            formatData = Base64.getEncoder().encodeToString(dataBytes);
+        }
+
+        return joiner.add(inlongGroupId)
+                .add(inlongStreamId)
+                .add(now)
+                .add(dataTime)
+                .add(dataflowId)
+                .add(serverType)
+                .add(dirtyType)
+                .add(ext)
+                .add(formatData).toString();
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
deleted file mode 100644
index d70127adf8..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import org.apache.inlong.sdk.dirtydata.sink.Configure;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-
-import java.io.Serializable;
-
-import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE;
-import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR;
-import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
-import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS;
-import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
-
-/**
- * Dirty common options
- */
-@Data
-@Builder
-@Getter
-public class DirtyOptions implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private static final String DEFAULT_FORMAT = "csv";
-    private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
-    private final boolean enableDirtyCollect;
-    private final boolean ignoreSideOutputErrors;
-    private final String sinkType;
-    private final String labels;
-    private final String logTag;
-    private final String format;
-    private final String csvFieldDelimiter;
-
-    private DirtyOptions(boolean enableDirtyCollect, boolean 
ignoreSideOutputErrors,
-            String sinkType, String labels, String logTag, String format, 
String csvFieldDelimiter) {
-        this.enableDirtyCollect = enableDirtyCollect;
-        this.ignoreSideOutputErrors = ignoreSideOutputErrors;
-        this.sinkType = sinkType;
-        this.labels = labels;
-        this.logTag = logTag;
-        this.format = format;
-        this.csvFieldDelimiter = csvFieldDelimiter;
-    }
-
-    /**
-     * Get dirty options from {@link Configure}
-     *
-     * @param config The config
-     * @return Dirty options
-     */
-    public static DirtyOptions fromConfig(Configure config) {
-        boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE, 
false);
-        boolean ignoreSinkError = 
config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true);
-        String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null);
-        String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null);
-        String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
-        String format = DEFAULT_FORMAT;
-        String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
-        return new DirtyOptions(enableDirtyCollect, ignoreSinkError,
-                dirtyConnector, labels, logTag, format, csvFieldDelimiter);
-    }
-
-    public void validate() {
-        if (!enableDirtyCollect) {
-            return;
-        }
-        if (sinkType == null || sinkType.trim().length() == 0) {
-            throw new RuntimeException(
-                    "The option 'dirty.side-output.connector' is not allowed 
to be empty "
-                            + "when the option 'dirty.ignore' is 'true' "
-                            + "and the option 'dirty.side-output.enable' is 
'true'");
-        }
-    }
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
deleted file mode 100644
index 68d8cc9110..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import org.apache.inlong.sdk.dirtydata.sink.Configure;
-
-import java.io.Serializable;
-
-/**
- * The dirty sink base inteface
- *
- */
-public interface DirtySink extends Serializable {
-
-    /**
-     * Open for dirty sink
-     *
-     * @param configuration The configuration that is used for dirty sink
-     * @throws Exception The exception may be thrown when executing
-     */
-    default void open(Configure configuration) throws Exception {
-
-    }
-
-    /**
-     * Invoke that is used to sink dirty data
-     *
-     * @param dirtyData The dirty data that will be written
-     * @throws Exception The exception may be thrown when executing
-     */
-    void invoke(DirtyData dirtyData) throws Exception;
-
-    /**
-     * Close for dirty sink
-     *
-     * @throws Exception The exception may be thrown when executing
-     */
-    default void close() throws Exception {
-
-    }
-
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
new file mode 100644
index 0000000000..2240ebdb6c
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.MessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+
+import com.google.common.base.Preconditions;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Builder
+public class InlongSdkDirtySink {
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String inlongManagerAddr;
+    private String authId;
+    private String authKey;
+    private boolean ignoreErrors;
+
+    private SendMessageCallback callback;
+    private MessageSender sender;
+
+    public void init() throws Exception {
+        Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be 
null");
+        Preconditions.checkNotNull(inlongStreamId, "inlongStreamId cannot be 
null");
+        Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr 
cannot be null");
+        Preconditions.checkNotNull(authId, "authId cannot be null");
+        Preconditions.checkNotNull(authKey, "authKey cannot be null");
+
+        this.callback = new LogCallBack();
+        ProxyClientConfig proxyClientConfig =
+                new ProxyClientConfig(inlongManagerAddr, inlongGroupId, 
authId, authKey);
+        this.sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+        log.info("init InlongSdkDirtySink successfully, target group={}, 
stream={}", inlongGroupId, inlongStreamId);
+    }
+
+    public void sendDirtyMessage(DirtyMessageWrapper messageWrapper)
+            throws ProxysdkException {
+        sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
messageWrapper.format().getBytes(), callback);
+    }
+
+    class LogCallBack implements SendMessageCallback {
+
+        @Override
+        public void onMessageAck(SendResult result) {
+            if (result == SendResult.OK) {
+                return;
+            }
+            log.error("failed to send inlong dirty message, response={}", 
result);
+
+            if (!ignoreErrors) {
+                throw new RuntimeException("writing dirty message to inlong 
sdk failed, response=" + result);
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            log.error("failed to send inlong dirty message", e);
+
+            if (!ignoreErrors) {
+                throw new RuntimeException("writing dirty message to inlong 
sdk failed", e);
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
deleted file mode 100644
index 20f70c5205..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * The pattern replace utils
- */
-public final class PatternReplaceUtils {
-
-    private static final Pattern REGEX_PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}",
-            Pattern.CASE_INSENSITIVE);
-
-    public static String replace(String pattern, Map<String, String> params) {
-        if (pattern == null) {
-            return null;
-        }
-        Matcher matcher = REGEX_PATTERN.matcher(pattern);
-        StringBuffer sb = new StringBuffer();
-        while (matcher.find()) {
-            String keyText = matcher.group(1);
-            String replacement = params.getOrDefault(keyText, keyText);
-            matcher.appendReplacement(sb, replacement);
-        }
-        matcher.appendTail(sb);
-        return sb.toString();
-    }
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
deleted file mode 100644
index e2031915c6..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Configure {
-
-    private Map<String, String> data;
-
-    public Configure(Map<String, String> data) {
-        this.data = new ConcurrentHashMap<>();
-        this.data.putAll(data);
-    }
-
-    public String get(String key, String defaultValue) {
-        String value = data.get(key);
-        if (value != null) {
-            return value;
-        }
-        return defaultValue;
-    }
-
-    public String get(String key) {
-        return data.get(key);
-    }
-
-    public Boolean getBoolean(String key, boolean defaultValue) {
-        String value = data.get(key);
-        if (value != null) {
-            return Boolean.valueOf(value);
-        }
-        return defaultValue;
-    }
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
deleted file mode 100644
index bef0fc3110..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.MessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dirtydata.DirtyData;
-import org.apache.inlong.sdk.dirtydata.DirtySink;
-
-import com.google.common.base.Preconditions;
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.StringJoiner;
-
-import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
-
-@Slf4j
-public class InlongSdkDirtySink implements DirtySink {
-
-    // The inlong manager addr to init inlong sdk
-    private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR =
-            "dirty.side-output.inlong-sdk.inlong-manager-addr";
-    // The inlong manager auth id to init inlong sdk
-    private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = 
"dirty.side-output.inlong-sdk.inlong-auth-id";
-    // The inlong manager auth id to init inlong sdk
-    private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY = 
"dirty.side-output.inlong-sdk.inlong-auth-key";
-    // The inlong group id of dirty sink
-    private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP = 
"dirty.side-output.inlong-sdk.inlong-group-id";
-    // The inlong stream id of dirty sink
-    private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM = 
"dirty.side-output.inlong-sdk.inlong-stream-id";
-
-    private InlongSdkOptions options;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private final SendMessageCallback callback;
-
-    private transient DateTimeFormatter dateTimeFormatter;
-    private transient MessageSender sender;
-
-    public InlongSdkDirtySink() {
-        this.callback = new LogCallBack();
-    }
-
-    @Override
-    public void open(Configure configuration) throws Exception {
-        options = getOptions(configuration);
-        this.inlongGroupId = options.getInlongGroupId();
-        this.inlongStreamId = options.getInlongStreamId();
-        dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-        // init sender
-        ProxyClientConfig proxyClientConfig =
-                new ProxyClientConfig(options.getInlongManagerAddr(), 
options.getInlongGroupId(),
-                        options.getInlongManagerAuthId(), 
options.getInlongManagerAuthKey());
-        sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
-    }
-
-    @Override
-    public void invoke(DirtyData dirtyData) {
-        try {
-            Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
-            String groupId = 
Preconditions.checkNotNull(labelMap.get("groupId"));
-            String streamId = 
Preconditions.checkNotNull(labelMap.get("streamId"));
-
-            String message = join(groupId, streamId,
-                    dirtyData.getDirtyType(), dirtyData.getLabels(),
-                    new String(dirtyData.getData(), StandardCharsets.UTF_8));
-            sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
message.getBytes(), callback);
-        } catch (Throwable t) {
-            log.error("failed to send dirty message to inlong sdk", t);
-        }
-    }
-
-    private InlongSdkOptions getOptions(Configure config) {
-        return InlongSdkOptions.builder()
-                
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR))
-                .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
-                .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
-                
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
-                
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
-                
.ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, 
true))
-                .enableDirtyLog(true)
-                .build();
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (sender != null) {
-            sender.close();
-        }
-    }
-
-    private String join(
-            String inlongGroup,
-            String inlongStream,
-            String type,
-            String label,
-            String formattedData) {
-
-        String now = LocalDateTime.now().format(dateTimeFormatter);
-
-        StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
-        return joiner.add(inlongGroup + "." + inlongStream)
-                .add(now)
-                .add(type)
-                .add(label)
-                .add(formattedData).toString();
-    }
-
-    class LogCallBack implements SendMessageCallback {
-
-        @Override
-        public void onMessageAck(SendResult result) {
-            if (result == SendResult.OK) {
-                return;
-            }
-            log.error("failed to send inlong dirty message, response={}", 
result);
-
-            if (!options.isIgnoreSideOutputErrors()) {
-                throw new RuntimeException("writing dirty message to inlong 
sdk failed, response=" + result);
-            }
-        }
-
-        @Override
-        public void onException(Throwable e) {
-            log.error("failed to send inlong dirty message", e);
-
-            if (!options.isIgnoreSideOutputErrors()) {
-                throw new RuntimeException("writing dirty message to inlong 
sdk failed", e);
-            }
-        }
-    }
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
deleted file mode 100644
index b657a97f20..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-
-import java.io.Serializable;
-
-@Data
-@Builder
-@Getter
-public class InlongSdkOptions implements Serializable {
-
-    private static final String DEFAULT_FORMAT = "csv";
-
-    private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
-    private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";
-
-    private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
-    private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
-
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private String inlongManagerAddr;
-    private String inlongManagerAuthKey;
-    private String inlongManagerAuthId;
-    private String format = DEFAULT_FORMAT;
-    private boolean ignoreSideOutputErrors;
-    private boolean enableDirtyLog;
-    private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
-    private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
-    private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
-    private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
-}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
deleted file mode 100644
index 2ce58b134d..0000000000
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER;
-import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER;
-
-/**
- * The label utils class, is used to parse the labels to a label map
- */
-public final class LabelUtils {
-
-    private LabelUtils() {
-    }
-
-    /**
-     * Parse the labels to label map
-     *
-     * @param labels The labels format by 'key1=value1&key2=value2...'
-     * @return The label map of labels
-     */
-    public static Map<String, String> parseLabels(String labels) {
-        return parseLabels(labels, new LinkedHashMap<>());
-    }
-
-    /**
-     * Parse the labels to label map
-     *
-     * @param labels The labels format by 'key1=value1&key2=value2...'
-     * @return The label map of labels
-     */
-    public static Map<String, String> parseLabels(String labels, Map<String, 
String> labelMap) {
-        if (labelMap == null) {
-            labelMap = new LinkedHashMap<>();
-        }
-        if (labels == null || labels.length() == 0) {
-            return labelMap;
-        }
-        String[] labelArray = labels.split(DELIMITER);
-        for (String label : labelArray) {
-            int index = label.indexOf(KEY_VALUE_DELIMITER);
-            if (index < 1 || index == label.length() - 1) {
-                throw new IllegalArgumentException("The format of labels must 
be like 'key1=value1&key2=value2...'");
-            }
-            labelMap.put(label.substring(0, index), label.substring(index + 
1));
-        }
-        return labelMap;
-    }
-}


Reply via email to