This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2f10b5e563 [INLONG-11520][SDK] Remove DirtyServerType, use SinkType (#11521) 2f10b5e563 is described below commit 2f10b5e563bddce1f594b32182debc3cddda72b3 Author: vernedeng <verned...@apache.org> AuthorDate: Sun Nov 24 11:56:44 2024 +0800 [INLONG-11520][SDK] Remove DirtyServerType, use SinkType (#11521) --- .../apache/inlong/common/constant/SinkType.java | 2 ++ .../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 3 +- .../apache/inlong/sort/base/dirty/DirtyData.java | 11 +++---- .../sort/base/dirty/sink/DirtyServerType.java | 37 ---------------------- .../base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +- .../DynamicTubeMQTableDeserializationSchema.java | 4 +-- 6 files changed, 12 insertions(+), 47 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java index c2816baf1e..f3eb862d87 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java @@ -19,6 +19,8 @@ package org.apache.inlong.common.constant; public class SinkType { + public static final String ICEBERG = "ICEBERG"; + public static final String HIVE = "HIVE"; public static final String KAFKA = "KAFKA"; public static final String PULSAR = "PULSAR"; public static final String CLS = "CLS"; diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java index a82d574cac..a22c4249b0 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java @@ -34,7 +34,8 @@ import java.util.StringJoiner; public class DirtyMessageWrapper { private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private String delimiter; + @Builder.Default + private String delimiter = "|"; @Builder.Default @Getter private int retryTimes = 0; diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java index 24c5dddecd..7670879925 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.base.dirty; -import org.apache.inlong.sort.base.dirty.sink.DirtyServerType; import org.apache.inlong.sort.base.util.PatternReplaceUtils; import org.apache.flink.table.types.logical.LogicalType; @@ -65,7 +64,7 @@ public class DirtyData<T> { */ private final DirtyType dirtyType; - private final DirtyServerType serverType; + private final String serverType; /** * Dirty describe message, it is the cause of dirty data */ @@ -88,7 +87,7 @@ public class DirtyData<T> { private final T data; public DirtyData(T data, String identifier, String labels, - String logTag, DirtyType dirtyType, DirtyServerType serverType, String dirtyMessage, + String logTag, DirtyType dirtyType, String serverType, String dirtyMessage, @Nullable LogicalType rowType, long dataTime, String extParams) { this.data = data; this.dirtyType = dirtyType; @@ -131,7 +130,7 @@ public class DirtyData<T> { return dirtyType; } - public DirtyServerType getServerType() { + public String getServerType() { return serverType; } @@ -162,7 +161,7 @@ public class DirtyData<T> { private String labels; private String logTag; private DirtyType dirtyType = DirtyType.UNDEFINED; - private DirtyServerType serverType = DirtyServerType.UNDEFINED; + private String serverType; private String dirtyMessage; private LogicalType rowType; private long dataTime; @@ -184,7 +183,7 @@ public class DirtyData<T> { return this; } - public Builder<T> setServerType(DirtyServerType serverType) { + public Builder<T> setServerType(String serverType) { this.serverType = serverType; return this; } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java deleted file mode 100644 index 63f993c146..0000000000 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.base.dirty.sink; - -public enum DirtyServerType { - - UNDEFINED("Undefined"), - TUBE_MQ("TubeMQ"), - ICEBERG("Iceberg") - - ; - - private final String format; - - DirtyServerType(String format) { - this.format = format; - } - - public String format() { - return format; - } -} diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index 8513f841bc..8e692a4c10 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -67,7 +67,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> { .inlongStreamId(dataStreamId) .dataflowId(dataflowId) .dataTime(dirtyData.getDataTime()) - .serverType(dirtyData.getServerType().format()) + .serverType(dirtyData.getServerType()) .dirtyType(dirtyData.getDirtyType().format()) .dirtyMessage(dirtyData.getDirtyMessage()) .ext(dirtyData.getExtParams()) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index 94631e7cd3..2749213096 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -17,10 +17,10 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.sort.base.dirty.DirtyData; import org.apache.inlong.sort.base.dirty.DirtyOptions; import org.apache.inlong.sort.base.dirty.DirtyType; -import org.apache.inlong.sort.base.dirty.sink.DirtyServerType; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; @@ -144,7 +144,7 @@ public class DynamicTubeMQTableDeserializationSchema implements DynamicTubeMQDes builder.setData(message.getData()) .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR) - .setServerType(DirtyServerType.TUBE_MQ) + .setServerType(MQType.TUBEMQ) .setDirtyDataTime(dataTime) .setExtParams(message.getAttribute()) .setLabels(dirtyOptions.getLabels())