This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f10b5e563 [INLONG-11520][SDK] Remove DirtyServerType, use SinkType 
(#11521)
2f10b5e563 is described below

commit 2f10b5e563bddce1f594b32182debc3cddda72b3
Author: vernedeng <verned...@apache.org>
AuthorDate: Sun Nov 24 11:56:44 2024 +0800

    [INLONG-11520][SDK] Remove DirtyServerType, use SinkType (#11521)
---
 .../apache/inlong/common/constant/SinkType.java    |  2 ++
 .../inlong/sdk/dirtydata/DirtyMessageWrapper.java  |  3 +-
 .../apache/inlong/sort/base/dirty/DirtyData.java   | 11 +++----
 .../sort/base/dirty/sink/DirtyServerType.java      | 37 ----------------------
 .../base/dirty/sink/sdk/InlongSdkDirtySink.java    |  2 +-
 .../DynamicTubeMQTableDeserializationSchema.java   |  4 +--
 6 files changed, 12 insertions(+), 47 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
index c2816baf1e..f3eb862d87 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
@@ -19,6 +19,8 @@ package org.apache.inlong.common.constant;
 
 public class SinkType {
 
+    public static final String ICEBERG = "ICEBERG";
+    public static final String HIVE = "HIVE";
     public static final String KAFKA = "KAFKA";
     public static final String PULSAR = "PULSAR";
     public static final String CLS = "CLS";
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
index a82d574cac..a22c4249b0 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -34,7 +34,8 @@ import java.util.StringJoiner;
 public class DirtyMessageWrapper {
 
     private static DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    private String delimiter;
+    @Builder.Default
+    private String delimiter = "|";
     @Builder.Default
     @Getter
     private int retryTimes = 0;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index 24c5dddecd..7670879925 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.base.dirty;
 
-import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
 import org.apache.inlong.sort.base.util.PatternReplaceUtils;
 
 import org.apache.flink.table.types.logical.LogicalType;
@@ -65,7 +64,7 @@ public class DirtyData<T> {
      */
     private final DirtyType dirtyType;
 
-    private final DirtyServerType serverType;
+    private final String serverType;
     /**
      * Dirty describe message, it is the cause of dirty data
      */
@@ -88,7 +87,7 @@ public class DirtyData<T> {
     private final T data;
 
     public DirtyData(T data, String identifier, String labels,
-            String logTag, DirtyType dirtyType, DirtyServerType serverType, 
String dirtyMessage,
+            String logTag, DirtyType dirtyType, String serverType, String 
dirtyMessage,
             @Nullable LogicalType rowType, long dataTime, String extParams) {
         this.data = data;
         this.dirtyType = dirtyType;
@@ -131,7 +130,7 @@ public class DirtyData<T> {
         return dirtyType;
     }
 
-    public DirtyServerType getServerType() {
+    public String getServerType() {
         return serverType;
     }
 
@@ -162,7 +161,7 @@ public class DirtyData<T> {
         private String labels;
         private String logTag;
         private DirtyType dirtyType = DirtyType.UNDEFINED;
-        private DirtyServerType serverType = DirtyServerType.UNDEFINED;
+        private String serverType;
         private String dirtyMessage;
         private LogicalType rowType;
         private long dataTime;
@@ -184,7 +183,7 @@ public class DirtyData<T> {
             return this;
         }
 
-        public Builder<T> setServerType(DirtyServerType serverType) {
+        public Builder<T> setServerType(String serverType) {
             this.serverType = serverType;
             return this;
         }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
deleted file mode 100644
index 63f993c146..0000000000
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.base.dirty.sink;
-
-public enum DirtyServerType {
-
-    UNDEFINED("Undefined"),
-    TUBE_MQ("TubeMQ"),
-    ICEBERG("Iceberg")
-
-    ;
-
-    private final String format;
-
-    DirtyServerType(String format) {
-        this.format = format;
-    }
-
-    public String format() {
-        return format;
-    }
-}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 8513f841bc..8e692a4c10 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -67,7 +67,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
                     .inlongStreamId(dataStreamId)
                     .dataflowId(dataflowId)
                     .dataTime(dirtyData.getDataTime())
-                    .serverType(dirtyData.getServerType().format())
+                    .serverType(dirtyData.getServerType())
                     .dirtyType(dirtyData.getDirtyType().format())
                     .dirtyMessage(dirtyData.getDirtyMessage())
                     .ext(dirtyData.getExtParams())
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index 94631e7cd3..2749213096 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.sort.tubemq.table;
 
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.DirtyType;
-import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
@@ -144,7 +144,7 @@ public class DynamicTubeMQTableDeserializationSchema 
implements DynamicTubeMQDes
 
                         builder.setData(message.getData())
                                 .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
-                                .setServerType(DirtyServerType.TUBE_MQ)
+                                .setServerType(MQType.TUBEMQ)
                                 .setDirtyDataTime(dataTime)
                                 .setExtParams(message.getAttribute())
                                 .setLabels(dirtyOptions.getLabels())

Reply via email to