This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dba59007b6 [INLONG-9962][Manager] Data preview supports returning 
header and specific field information (#9967)
dba59007b6 is described below

commit dba59007b6641d31d10a6157a8e07f628082a8f7
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Apr 10 21:39:37 2024 +0800

    [INLONG-9962][Manager] Data preview supports returning header and specific 
field information (#9967)
    
    * [INLONG-9962][Manager] Data preview supports returning header and 
specific field information
    
    * [INLONG-9962][Manager] Data preview supports returning header and 
specific field information
    
    * [INLONG-9962][Manager] Fix error
    
    * [INLONG-9962][Manager] Fix error
    
    * [INLONG-9962][Manager] Fix error
---
 .../apache/inlong/common/enums/DataTypeEnum.java   | 20 ++++++-
 .../dataproxy/config/holder/MetaConfigHolder.java  |  6 +-
 .../dataproxy/config/pojo/IdTopicConfig.java       |  7 ++-
 .../manager/pojo/consume/BriefMQMessage.java       | 11 ++++
 .../service/datatype/CsvDataTypeOperator.java      | 63 +++++++++++++++++++
 .../manager/service/datatype/DataTypeOperator.java | 56 +++++------------
 .../service/datatype/DataTypeOperatorFactory.java  | 49 +++++++++++++++
 .../message/InlongMsgDeserializeOperator.java      | 33 +++++++++-
 .../service/message/RawMsgDeserializeOperator.java | 32 +++++++++-
 .../resource/queue/kafka/KafkaOperator.java        | 10 ++--
 .../resource/queue/pulsar/PulsarOperator.java      |  4 ++
 .../resource/queue/tubemq/TubeMQOperator.java      | 10 ++--
 .../resource/queue/kafka/KafkaOperatorTest.java    |  4 ++
 .../sort/standalone/config/pojo/type/DataType.java | 70 ----------------------
 .../sort/standalone/sink/kafka/KafkaIdConfig.java  | 12 ++--
 .../standalone/sink/pulsar/PulsarIdConfig.java     | 12 ++--
 16 files changed, 256 insertions(+), 143 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 2d2d8d1997..f327508660 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -30,8 +30,10 @@ public enum DataTypeEnum {
     CANAL("canal"),
     DEBEZIUM_JSON("debezium_json"),
     RAW("raw"),
-
-    ;
+    TEXT("text"),
+    PB("pb"),
+    JCE("jce"),
+    UNKNOWN("n");
 
     private final String type;
 
@@ -48,6 +50,20 @@ public enum DataTypeEnum {
         throw new IllegalArgumentException("Unsupported data type for " + 
type);
     }
 
+    public static DataTypeEnum convert(String value) {
+        for (DataTypeEnum v : values()) {
+            if (v.getType().equals(value)) {
+                return v;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    @Override
+    public String toString() {
+        return this.name() + ":" + this.type;
+    }
+
     public String getType() {
         return type;
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 369e7b2212..96c5bd820a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.config.holder;
 
 import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
 import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
@@ -29,7 +30,6 @@ import org.apache.inlong.dataproxy.config.ConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.CacheType;
-import org.apache.inlong.dataproxy.config.pojo.DataType;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
@@ -407,8 +407,8 @@ public class MetaConfigHolder extends ConfigHolder {
             tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
             tmpConfig.setTopicName(topicName);
             tmpConfig.setParams(idObject.getParams());
-            tmpConfig.setDataType(DataType.convert(
-                    idObject.getParams().getOrDefault("dataType", 
DataType.TEXT.value())));
+            tmpConfig.setDataType(DataTypeEnum.convert(
+                    idObject.getParams().getOrDefault("dataType", 
DataTypeEnum.TEXT.getType())));
             
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", 
"|"));
             
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", 
"\n"));
             tmpConfig.setUseExtendedFields(Boolean.valueOf(
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index 31646fd3b5..48b19d446d 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.config.pojo;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
 import org.apache.commons.lang.StringUtils;
@@ -37,7 +38,7 @@ public class IdTopicConfig {
     private String topicName;
     private String tenant;
     private String nameSpace;
-    private DataType dataType = DataType.TEXT;
+    private DataTypeEnum dataType = DataTypeEnum.TEXT;
     private String fieldDelimiter = "|";
     private String fileDelimiter = "\n";
     private Boolean useExtendedFields = false;
@@ -142,7 +143,7 @@ public class IdTopicConfig {
      * get dataType
      * @return the dataType
      */
-    public DataType getDataType() {
+    public DataTypeEnum getDataType() {
         return dataType;
     }
 
@@ -150,7 +151,7 @@ public class IdTopicConfig {
      * set dataType
      * @param dataType the dataType to set
      */
-    public void setDataType(DataType dataType) {
+    public void setDataType(DataTypeEnum dataType) {
         this.dataType = dataType;
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
index 51085c5570..72dd4f6303 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.pojo.consume;
 
+import org.apache.inlong.manager.pojo.stream.StreamField;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -24,6 +26,9 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * Brief Message info for MQ
  */
@@ -49,7 +54,13 @@ public class BriefMQMessage {
     @ApiModelProperty(value = "Client ip")
     private String clientIp;
 
+    @ApiModelProperty(value = "Message header")
+    private Map<String, String> headers;
+
     @ApiModelProperty(value = "Message body")
     private String body;
 
+    @ApiModelProperty(value = "List of field info")
+    private List<StreamField> fieldList;
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
new file mode 100644
index 0000000000..5bb0b969f6
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Slf4j
+@Service
+public class CsvDataTypeOperator implements DataTypeOperator {
+
+    @Override
+    public boolean accept(DataTypeEnum type) {
+        return DataTypeEnum.CSV.equals(type);
+    }
+
+    @Override
+    public List<StreamField> parseFields(String str, InlongStreamInfo 
streamInfo) throws Exception {
+        List<StreamField> streamFields = 
CommonBeanUtils.copyListProperties(streamInfo.getFieldList(),
+                StreamField::new);
+        try {
+            char separator = '|';
+            if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+                separator = (char) 
Integer.parseInt(streamInfo.getDataSeparator());
+            }
+            String[] bodys = StringUtils.split(str, separator);
+            if (bodys.length != streamFields.size()) {
+                return streamFields;
+            }
+            for (int i = 0; i < bodys.length; i++) {
+                streamFields.get(i).setFieldValue(bodys[i]);
+            }
+            return streamFields;
+        } catch (Exception e) {
+            log.warn("parse fields failed for groupId = {}, streamId = {}", 
streamInfo.getInlongGroupId(),
+                    streamInfo.getInlongStreamId(), e);
+        }
+        return streamFields;
+    }
+}
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
similarity index 51%
rename from 
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
index 18491e7343..f2e42b2171 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
@@ -15,56 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.dataproxy.config.pojo;
+package org.apache.inlong.manager.service.datatype;
 
-/**
- * data content type
- */
-public enum DataType {
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 
-    TEXT("text"), PB("pb"), JCE("jce"), N("n"), CSV("csv"), KV("kv");
+import java.util.List;
 
-    private final String value;
+/**
+ * Data type operator
+ */
+public interface DataTypeOperator {
 
     /**
-     * 
-     * Constructor
-     * 
-     * @param value
+     * Determines whether the current instance matches the specified type.
      */
-    private DataType(String value) {
-        this.value = value;
-    }
+    boolean accept(DataTypeEnum type);
 
     /**
-     * value
+     * Parse fields from message
      *
-     * @return
+     * @param streamInfo inlong stream info
+     * @return list of field info
      */
-    public String value() {
-        return this.value;
+    default List<StreamField> parseFields(String message, InlongStreamInfo 
streamInfo) throws Exception {
+        return streamInfo.getFieldList();
     }
 
-    /**
-     * toString
-     */
-    @Override
-    public String toString() {
-        return this.name() + ":" + this.value;
-    }
-
-    /**
-     * convert
-     *
-     * @param  value
-     * @return
-     */
-    public static DataType convert(String value) {
-        for (DataType v : values()) {
-            if (v.value().equals(value)) {
-                return v;
-            }
-        }
-        return N;
-    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java
new file mode 100644
index 0000000000..8b36803ffb
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link DataTypeOperator}.
+ */
+@Service
+public class DataTypeOperatorFactory {
+
+    @Autowired
+    private List<DataTypeOperator> operatorList;
+
+    /**
+     * Get a data type operator instance via the given data type
+     */
+    public DataTypeOperator getInstance(DataTypeEnum type) {
+        return operatorList.stream()
+                .filter(inst -> inst.accept(type))
+                .findFirst()
+                .orElseThrow(() -> new 
BusinessException(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED,
+                        
String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type)));
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index 0e1b2a0a56..16a268428d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -17,14 +17,20 @@
 
 package org.apache.inlong.manager.service.message;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.util.StringUtil;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.nio.charset.Charset;
@@ -38,6 +44,9 @@ import java.util.Objects;
 @Service
 public class InlongMsgDeserializeOperator implements DeserializeOperator {
 
+    @Autowired
+    public DataTypeOperatorFactory dataTypeOperatorFactory;
+
     @Override
     public boolean accept(MessageWrapType type) {
         return MessageWrapType.INLONG_MSG_V0.equals(type);
@@ -71,9 +80,27 @@ public class InlongMsgDeserializeOperator implements 
DeserializeOperator {
                 if (Objects.isNull(bodyBytes)) {
                     continue;
                 }
-                BriefMQMessage message = new BriefMQMessage(index, groupId, 
streamId, msgTime, attrMap.get(CLIENT_IP),
-                        new String(bodyBytes, 
Charset.forName(streamInfo.getDataEncoding())));
-                messageList.add(message);
+                try {
+                    String body = new String(bodyBytes, 
Charset.forName(streamInfo.getDataEncoding()));
+                    DataTypeOperator dataTypeOperator =
+                            
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
+                    List<StreamField> streamFieldList = 
dataTypeOperator.parseFields(body, streamInfo);
+                    BriefMQMessage message = BriefMQMessage.builder()
+                            .id(index)
+                            .inlongGroupId(groupId)
+                            .inlongStreamId(streamId)
+                            .dt(msgTime)
+                            .clientIp(attrMap.get(CLIENT_IP))
+                            .headers(headers)
+                            .body(body)
+                            .fieldList(streamFieldList)
+                            .build();
+                    messageList.add(message);
+                } catch (Exception e) {
+                    String errMsg = String.format("decode msg failed for 
groupId=%s, streamId=%s", groupId, streamId);
+                    log.error(errMsg, e);
+                    throw new BusinessException(errMsg);
+                }
             }
         }
         return messageList;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index 361afe46d6..6c6e2840e3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -17,12 +17,18 @@
 
 package org.apache.inlong.manager.service.message;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.nio.charset.Charset;
@@ -34,6 +40,9 @@ import java.util.Map;
 @Service
 public class RawMsgDeserializeOperator implements DeserializeOperator {
 
+    @Autowired
+    public DataTypeOperatorFactory dataTypeOperatorFactory;
+
     @Override
     public boolean accept(MessageWrapType type) {
         return MessageWrapType.RAW.equals(type);
@@ -45,8 +54,27 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
         String groupId = headers.get(AttributeConstants.GROUP_ID);
         String streamId = headers.get(AttributeConstants.STREAM_ID);
         long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
-        return Collections.singletonList(new BriefMQMessage(index, groupId, 
streamId, msgTime,
-                headers.get(CLIENT_IP), new String(msgBytes, 
Charset.forName(streamInfo.getDataEncoding()))));
+        String body = new String(msgBytes, 
Charset.forName(streamInfo.getDataEncoding()));
+        try {
+            DataTypeOperator dataTypeOperator =
+                    
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
+            List<StreamField> streamFieldList = 
dataTypeOperator.parseFields(body, streamInfo);
+            BriefMQMessage briefMQMessage = BriefMQMessage.builder()
+                    .id(index)
+                    .inlongGroupId(groupId)
+                    .inlongStreamId(streamId)
+                    .dt(msgTime)
+                    .clientIp(headers.get(CLIENT_IP))
+                    .headers(headers)
+                    .body(body)
+                    .fieldList(streamFieldList)
+                    .build();
+            return Collections.singletonList(briefMQMessage);
+        } catch (Exception e) {
+            String errMsg = String.format("decode msg failed for groupId=%s, 
streamId=%s", groupId, streamId);
+            log.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index deb6601b63..763a654c26 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -156,10 +156,12 @@ public class KafkaOperator {
                     headers.put(header.key(), new String(header.value(), 
StandardCharsets.UTF_8));
                 }
 
-                int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                        
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
-                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                        MessageWrapType.valueOf(wrapTypeId));
+                MessageWrapType messageWrapType = 
MessageWrapType.forType(streamInfo.getWrapType());
+                if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
+                    messageWrapType =
+                            
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
+                }
+                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
                 messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
record.value(), headers, index));
                 if (messageList.size() >= messageCount) {
                     break;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index ccabb716f1..45e6112ce3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -48,6 +48,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -453,6 +454,9 @@ public class PulsarOperator {
                             messagePosition);
             PulsarMessageInfo messageInfo = 
PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
             Map<String, String> headers = messageInfo.getProperties();
+            if (headers == null) {
+                headers = new HashMap<>();
+            }
             MessageWrapType messageWrapType = 
MessageWrapType.forType(streamInfo.getWrapType());
             if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
                 messageWrapType =
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index e13b4cbe2c..cdbc4da86d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -303,11 +303,13 @@ public class TubeMQOperator {
                     map.put(kv.split(InlongConstants.EQUAL)[0], 
kv.split(InlongConstants.EQUAL)[1]);
                 }
 
-                int wrapTypeId = 
Integer.parseInt(map.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                        
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
+                MessageWrapType messageWrapType = 
MessageWrapType.forType(streamInfo.getWrapType());
+                if (map.get(InlongConstants.MSG_ENCODE_VER) != null) {
+                    messageWrapType =
+                            
MessageWrapType.valueOf(Integer.parseInt(map.get(InlongConstants.MSG_ENCODE_VER)));
+                }
                 byte[] messageData = 
Base64.getDecoder().decode(tubeDataInfo.getData());
-                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                        MessageWrapType.valueOf(wrapTypeId));
+                DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
                 messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
messageData, map, index));
             }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
index d2cf618b3d..acc6ebb3f5 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -93,6 +95,8 @@ public class KafkaOperatorTest extends ServiceBaseTest {
     @BeforeEach
     public void setUp() {
         streamInfo.setDataEncoding("UTF-8");
+        streamInfo.setDataType(DataTypeEnum.CSV.getType());
+        streamInfo.setWrapType(MessageWrapType.INLONG_MSG_V0.getName());
 
         List<TopicPartition> topicPartitions = IntStream.range(0, 
PARTITION_NUM)
                 .mapToObj(i -> new TopicPartition(TOPIC_NAME, 
i)).collect(Collectors.toList());
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
deleted file mode 100644
index 61a1a4ac54..0000000000
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.config.pojo.type;
-
-/**
- * data content type
- */
-public enum DataType {
-
-    TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n");
-
-    private final String value;
-
-    /**
-     * 
-     * Constructor
-     * 
-     * @param value
-     */
-    private DataType(String value) {
-        this.value = value;
-    }
-
-    /**
-     * value
-     *
-     * @return
-     */
-    public String value() {
-        return this.value;
-    }
-
-    /**
-     * toString
-     */
-    @Override
-    public String toString() {
-        return this.name() + ":" + this.value;
-    }
-
-    /**
-     * convert
-     *
-     * @param  value
-     * @return
-     */
-    public static DataType convert(String value) {
-        for (DataType v : values()) {
-            if (v.value().equals(value)) {
-                return v;
-            }
-        }
-        return UNKNOWN;
-    }
-}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index 3a527b23be..73d58afde0 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.sink.kafka;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.type.DataType;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
 import java.util.Map;
@@ -38,7 +38,7 @@ public class KafkaIdConfig {
     private String uid;
     private String separator = "|";
     private String topic;
-    private DataType dataType = DataType.TEXT;
+    private DataTypeEnum dataType = DataTypeEnum.TEXT;
 
     /**
      * Constructor
@@ -58,8 +58,8 @@ public class KafkaIdConfig {
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
         this.separator = idParam.getOrDefault(KafkaIdConfig.KEY_SEPARATOR, 
KafkaIdConfig.DEFAULT_SEPARATOR);
         this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
-        this.dataType = DataType
-                .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE, 
DataType.TEXT.value()));
+        this.dataType = DataTypeEnum
+                .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE, 
DataTypeEnum.TEXT.getType()));
     }
 
     /**
@@ -157,7 +157,7 @@ public class KafkaIdConfig {
      * 
      * @return the dataType
      */
-    public DataType getDataType() {
+    public DataTypeEnum getDataType() {
         return dataType;
     }
 
@@ -166,7 +166,7 @@ public class KafkaIdConfig {
      * 
      * @param dataType the dataType to set
      */
-    public void setDataType(DataType dataType) {
+    public void setDataType(DataTypeEnum dataType) {
         this.dataType = dataType;
     }
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 52377d2461..4ef3aece80 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
+import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.type.DataType;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
 import java.util.Map;
@@ -40,7 +40,7 @@ public class PulsarIdConfig {
     private String uid;
     private String separator = "|";
     private String topic;
-    private DataType dataType = DataType.TEXT;
+    private DataTypeEnum dataType = DataTypeEnum.TEXT;
 
     /**
      * Constructor
@@ -60,8 +60,8 @@ public class PulsarIdConfig {
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
         this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, 
PulsarIdConfig.DEFAULT_SEPARATOR);
         this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
-        this.dataType = DataType
-                .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, 
DataType.TEXT.value()));
+        this.dataType = DataTypeEnum
+                .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, 
DataTypeEnum.TEXT.getType()));
     }
 
     /**
@@ -159,7 +159,7 @@ public class PulsarIdConfig {
      *
      * @return the dataType
      */
-    public DataType getDataType() {
+    public DataTypeEnum getDataType() {
         return dataType;
     }
 
@@ -168,7 +168,7 @@ public class PulsarIdConfig {
      *
      * @param dataType the dataType to set
      */
-    public void setDataType(DataType dataType) {
+    public void setDataType(DataTypeEnum dataType) {
         this.dataType = dataType;
     }
 


Reply via email to