This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new dba59007b6 [INLONG-9962][Manager] Data preview supports returning header and specific field information (#9967) dba59007b6 is described below commit dba59007b6641d31d10a6157a8e07f628082a8f7 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Apr 10 21:39:37 2024 +0800 [INLONG-9962][Manager] Data preview supports returning header and specific field information (#9967) * [INLONG-9962][Manager] Data preview supports returning header and specific field information * [INLONG-9962][Manager] Data preview supports returning header and specific field information * [INLONG-9962][Manager] Fix error * [INLONG-9962][Manager] Fix error * [INLONG-9962][Manager] Fix error --- .../apache/inlong/common/enums/DataTypeEnum.java | 20 ++++++- .../dataproxy/config/holder/MetaConfigHolder.java | 6 +- .../dataproxy/config/pojo/IdTopicConfig.java | 7 ++- .../manager/pojo/consume/BriefMQMessage.java | 11 ++++ .../service/datatype/CsvDataTypeOperator.java | 63 +++++++++++++++++++ .../manager/service/datatype/DataTypeOperator.java | 56 +++++------------ .../service/datatype/DataTypeOperatorFactory.java | 49 +++++++++++++++ .../message/InlongMsgDeserializeOperator.java | 33 +++++++++- .../service/message/RawMsgDeserializeOperator.java | 32 +++++++++- .../resource/queue/kafka/KafkaOperator.java | 10 ++-- .../resource/queue/pulsar/PulsarOperator.java | 4 ++ .../resource/queue/tubemq/TubeMQOperator.java | 10 ++-- .../resource/queue/kafka/KafkaOperatorTest.java | 4 ++ .../sort/standalone/config/pojo/type/DataType.java | 70 ---------------------- .../sort/standalone/sink/kafka/KafkaIdConfig.java | 12 ++-- .../standalone/sink/pulsar/PulsarIdConfig.java | 12 ++-- 16 files changed, 256 insertions(+), 143 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java index 2d2d8d1997..f327508660 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java @@ -30,8 +30,10 @@ public enum DataTypeEnum { CANAL("canal"), DEBEZIUM_JSON("debezium_json"), RAW("raw"), - - ; + TEXT("text"), + PB("pb"), + JCE("jce"), + UNKNOWN("n"); private final String type; @@ -48,6 +50,20 @@ public enum DataTypeEnum { throw new IllegalArgumentException("Unsupported data type for " + type); } + public static DataTypeEnum convert(String value) { + for (DataTypeEnum v : values()) { + if (v.getType().equals(value)) { + return v; + } + } + return UNKNOWN; + } + + @Override + public String toString() { + return this.name() + ":" + this.type; + } + public String getType() { return type; } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java index 369e7b2212..96c5bd820a 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java @@ -18,6 +18,7 @@ package org.apache.inlong.dataproxy.config.holder; import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject; import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject; import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; @@ -29,7 +30,6 @@ import org.apache.inlong.dataproxy.config.ConfigHolder; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; import org.apache.inlong.dataproxy.config.pojo.CacheType; -import org.apache.inlong.dataproxy.config.pojo.DataType; import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.sdk.commons.protocol.InlongId; @@ -407,8 +407,8 @@ public class MetaConfigHolder extends ConfigHolder { tmpConfig.setTenantAndNameSpace(tenant, nameSpace); tmpConfig.setTopicName(topicName); tmpConfig.setParams(idObject.getParams()); - tmpConfig.setDataType(DataType.convert( - idObject.getParams().getOrDefault("dataType", DataType.TEXT.value()))); + tmpConfig.setDataType(DataTypeEnum.convert( + idObject.getParams().getOrDefault("dataType", DataTypeEnum.TEXT.getType()))); tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter", "|")); tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter", "\n")); tmpConfig.setUseExtendedFields(Boolean.valueOf( diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java index 31646fd3b5..48b19d446d 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java @@ -17,6 +17,7 @@ package org.apache.inlong.dataproxy.config.pojo; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.sdk.commons.protocol.InlongId; import org.apache.commons.lang.StringUtils; @@ -37,7 +38,7 @@ public class IdTopicConfig { private String topicName; private String tenant; private String nameSpace; - private DataType dataType = DataType.TEXT; + private DataTypeEnum dataType = DataTypeEnum.TEXT; private String fieldDelimiter = "|"; private String fileDelimiter = "\n"; private Boolean useExtendedFields = false; @@ -142,7 +143,7 @@ public class IdTopicConfig { * get dataType * @return the dataType */ - public DataType getDataType() { + public DataTypeEnum getDataType() { return dataType; } @@ -150,7 +151,7 @@ public class IdTopicConfig { * set dataType * @param dataType the dataType to set */ - public void setDataType(DataType dataType) { + public void setDataType(DataTypeEnum dataType) { this.dataType = dataType; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java index 51085c5570..72dd4f6303 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.pojo.consume; +import org.apache.inlong.manager.pojo.stream.StreamField; + import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; @@ -24,6 +26,9 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.List; +import java.util.Map; + /** * Brief Message info for MQ */ @@ -49,7 +54,13 @@ public class BriefMQMessage { @ApiModelProperty(value = "Client ip") private String clientIp; + @ApiModelProperty(value = "Message header") + private Map<String, String> headers; + @ApiModelProperty(value = "Message body") private String body; + @ApiModelProperty(value = "List of field info") + private List<StreamField> fieldList; + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java new file mode 100644 index 0000000000..5bb0b969f6 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.datatype; + +import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.StreamField; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Slf4j +@Service +public class CsvDataTypeOperator implements DataTypeOperator { + + @Override + public boolean accept(DataTypeEnum type) { + return DataTypeEnum.CSV.equals(type); + } + + @Override + public List<StreamField> parseFields(String str, InlongStreamInfo streamInfo) throws Exception { + List<StreamField> streamFields = CommonBeanUtils.copyListProperties(streamInfo.getFieldList(), + StreamField::new); + try { + char separator = '|'; + if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) { + separator = (char) Integer.parseInt(streamInfo.getDataSeparator()); + } + String[] bodys = StringUtils.split(str, separator); + if (bodys.length != streamFields.size()) { + return streamFields; + } + for (int i = 0; i < bodys.length; i++) { + streamFields.get(i).setFieldValue(bodys[i]); + } + return streamFields; + } catch (Exception e) { + log.warn("parse fields failed for groupId = {}, streamId = {}", streamInfo.getInlongGroupId(), + streamInfo.getInlongStreamId(), e); + } + return streamFields; + } +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java similarity index 51% rename from inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java index 18491e7343..f2e42b2171 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java @@ -15,56 +15,32 @@ * limitations under the License. */ -package org.apache.inlong.dataproxy.config.pojo; +package org.apache.inlong.manager.service.datatype; -/** - * data content type - */ -public enum DataType { +import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.StreamField; - TEXT("text"), PB("pb"), JCE("jce"), N("n"), CSV("csv"), KV("kv"); +import java.util.List; - private final String value; +/** + * Data type operator + */ +public interface DataTypeOperator { /** - * - * Constructor - * - * @param value + * Determines whether the current instance matches the specified type. */ - private DataType(String value) { - this.value = value; - } + boolean accept(DataTypeEnum type); /** - * value + * Parse fields from message * - * @return + * @param streamInfo inlong stream info + * @return list of field info */ - public String value() { - return this.value; + default List<StreamField> parseFields(String message, InlongStreamInfo streamInfo) throws Exception { + return streamInfo.getFieldList(); } - /** - * toString - */ - @Override - public String toString() { - return this.name() + ":" + this.value; - } - - /** - * convert - * - * @param value - * @return - */ - public static DataType convert(String value) { - for (DataType v : values()) { - if (v.value().equals(value)) { - return v; - } - } - return N; - } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java new file mode 100644 index 0000000000..8b36803ffb --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.datatype; + +import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * Factory for {@link DataTypeOperator}. + */ +@Service +public class DataTypeOperatorFactory { + + @Autowired + private List<DataTypeOperator> operatorList; + + /** + * Get a data type operator instance via the given data type + */ + public DataTypeOperator getInstance(DataTypeEnum type) { + return operatorList.stream() + .filter(inst -> inst.accept(type)) + .findFirst() + .orElseThrow(() -> new BusinessException(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type))); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java index 0e1b2a0a56..16a268428d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java @@ -17,14 +17,20 @@ package org.apache.inlong.manager.service.message; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.common.util.StringUtil; +import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.datatype.DataTypeOperator; +import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.nio.charset.Charset; @@ -38,6 +44,9 @@ import java.util.Objects; @Service public class InlongMsgDeserializeOperator implements DeserializeOperator { + @Autowired + public DataTypeOperatorFactory dataTypeOperatorFactory; + @Override public boolean accept(MessageWrapType type) { return MessageWrapType.INLONG_MSG_V0.equals(type); @@ -71,9 +80,27 @@ public class InlongMsgDeserializeOperator implements DeserializeOperator { if (Objects.isNull(bodyBytes)) { continue; } - BriefMQMessage message = new BriefMQMessage(index, groupId, streamId, msgTime, attrMap.get(CLIENT_IP), - new String(bodyBytes, Charset.forName(streamInfo.getDataEncoding()))); - messageList.add(message); + try { + String body = new String(bodyBytes, Charset.forName(streamInfo.getDataEncoding())); + DataTypeOperator dataTypeOperator = + dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType())); + List<StreamField> streamFieldList = dataTypeOperator.parseFields(body, streamInfo); + BriefMQMessage message = BriefMQMessage.builder() + .id(index) + .inlongGroupId(groupId) + .inlongStreamId(streamId) + .dt(msgTime) + .clientIp(attrMap.get(CLIENT_IP)) + .headers(headers) + .body(body) + .fieldList(streamFieldList) + .build(); + messageList.add(message); + } catch (Exception e) { + String errMsg = String.format("decode msg failed for groupId=%s, streamId=%s", groupId, streamId); + log.error(errMsg, e); + throw new BusinessException(errMsg); + } } } return messageList; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java index 361afe46d6..6c6e2840e3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java @@ -17,12 +17,18 @@ package org.apache.inlong.manager.service.message; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.datatype.DataTypeOperator; +import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.nio.charset.Charset; @@ -34,6 +40,9 @@ import java.util.Map; @Service public class RawMsgDeserializeOperator implements DeserializeOperator { + @Autowired + public DataTypeOperatorFactory dataTypeOperatorFactory; + @Override public boolean accept(MessageWrapType type) { return MessageWrapType.RAW.equals(type); @@ -45,8 +54,27 @@ public class RawMsgDeserializeOperator implements DeserializeOperator { String groupId = headers.get(AttributeConstants.GROUP_ID); String streamId = headers.get(AttributeConstants.STREAM_ID); long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0")); - return Collections.singletonList(new BriefMQMessage(index, groupId, streamId, msgTime, - headers.get(CLIENT_IP), new String(msgBytes, Charset.forName(streamInfo.getDataEncoding())))); + String body = new String(msgBytes, Charset.forName(streamInfo.getDataEncoding())); + try { + DataTypeOperator dataTypeOperator = + dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType())); + List<StreamField> streamFieldList = dataTypeOperator.parseFields(body, streamInfo); + BriefMQMessage briefMQMessage = BriefMQMessage.builder() + .id(index) + .inlongGroupId(groupId) + .inlongStreamId(streamId) + .dt(msgTime) + .clientIp(headers.get(CLIENT_IP)) + .headers(headers) + .body(body) + .fieldList(streamFieldList) + .build(); + return Collections.singletonList(briefMQMessage); + } catch (Exception e) { + String errMsg = String.format("decode msg failed for groupId=%s, streamId=%s", groupId, streamId); + log.error(errMsg, e); + throw new BusinessException(errMsg); + } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java index deb6601b63..763a654c26 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java @@ -156,10 +156,12 @@ public class KafkaOperator { headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)); } - int wrapTypeId = Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER, - Integer.toString(MessageWrapType.INLONG_MSG_V0.getId()))); - DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance( - MessageWrapType.valueOf(wrapTypeId)); + MessageWrapType messageWrapType = MessageWrapType.forType(streamInfo.getWrapType()); + if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) { + messageWrapType = + MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER))); + } + DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType); messageList.addAll(deserializeOperator.decodeMsg(streamInfo, record.value(), headers, index)); if (messageList.size() >= messageCount) { break; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index ccabb716f1..45e6112ce3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -48,6 +48,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -453,6 +454,9 @@ public class PulsarOperator { messagePosition); PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition); Map<String, String> headers = messageInfo.getProperties(); + if (headers == null) { + headers = new HashMap<>(); + } MessageWrapType messageWrapType = MessageWrapType.forType(streamInfo.getWrapType()); if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) { messageWrapType = diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java index e13b4cbe2c..cdbc4da86d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java @@ -303,11 +303,13 @@ public class TubeMQOperator { map.put(kv.split(InlongConstants.EQUAL)[0], kv.split(InlongConstants.EQUAL)[1]); } - int wrapTypeId = Integer.parseInt(map.getOrDefault(InlongConstants.MSG_ENCODE_VER, - Integer.toString(MessageWrapType.INLONG_MSG_V0.getId()))); + MessageWrapType messageWrapType = MessageWrapType.forType(streamInfo.getWrapType()); + if (map.get(InlongConstants.MSG_ENCODE_VER) != null) { + messageWrapType = + MessageWrapType.valueOf(Integer.parseInt(map.get(InlongConstants.MSG_ENCODE_VER))); + } byte[] messageData = Base64.getDecoder().decode(tubeDataInfo.getData()); - DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance( - MessageWrapType.valueOf(wrapTypeId)); + DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType); messageList.addAll(deserializeOperator.decodeMsg(streamInfo, messageData, map, index)); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java index d2cf618b3d..acc6ebb3f5 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.service.resource.queue.kafka; +import org.apache.inlong.common.enums.DataTypeEnum; +import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; @@ -93,6 +95,8 @@ public class KafkaOperatorTest extends ServiceBaseTest { @BeforeEach public void setUp() { streamInfo.setDataEncoding("UTF-8"); + streamInfo.setDataType(DataTypeEnum.CSV.getType()); + streamInfo.setWrapType(MessageWrapType.INLONG_MSG_V0.getName()); List<TopicPartition> topicPartitions = IntStream.range(0, PARTITION_NUM) .mapToObj(i -> new TopicPartition(TOPIC_NAME, i)).collect(Collectors.toList()); diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java deleted file mode 100644 index 61a1a4ac54..0000000000 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.standalone.config.pojo.type; - -/** - * data content type - */ -public enum DataType { - - TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n"); - - private final String value; - - /** - * - * Constructor - * - * @param value - */ - private DataType(String value) { - this.value = value; - } - - /** - * value - * - * @return - */ - public String value() { - return this.value; - } - - /** - * toString - */ - @Override - public String toString() { - return this.name() + ":" + this.value; - } - - /** - * convert - * - * @param value - * @return - */ - public static DataType convert(String value) { - for (DataType v : values()) { - if (v.value().equals(value)) { - return v; - } - } - return UNKNOWN; - } -} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java index 3a527b23be..73d58afde0 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java @@ -17,8 +17,8 @@ package org.apache.inlong.sort.standalone.sink.kafka; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.sort.standalone.config.pojo.InlongId; -import org.apache.inlong.sort.standalone.config.pojo.type.DataType; import org.apache.inlong.sort.standalone.utils.Constants; import java.util.Map; @@ -38,7 +38,7 @@ public class KafkaIdConfig { private String uid; private String separator = "|"; private String topic; - private DataType dataType = DataType.TEXT; + private DataTypeEnum dataType = DataTypeEnum.TEXT; /** * Constructor @@ -58,8 +58,8 @@ public class KafkaIdConfig { this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); this.separator = idParam.getOrDefault(KafkaIdConfig.KEY_SEPARATOR, KafkaIdConfig.DEFAULT_SEPARATOR); this.topic = idParam.getOrDefault(Constants.TOPIC, uid); - this.dataType = DataType - .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE, DataType.TEXT.value())); + this.dataType = DataTypeEnum + .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE, DataTypeEnum.TEXT.getType())); } /** @@ -157,7 +157,7 @@ public class KafkaIdConfig { * * @return the dataType */ - public DataType getDataType() { + public DataTypeEnum getDataType() { return dataType; } @@ -166,7 +166,7 @@ public class KafkaIdConfig { * * @param dataType the dataType to set */ - public void setDataType(DataType dataType) { + public void setDataType(DataTypeEnum dataType) { this.dataType = dataType; } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java index 52377d2461..4ef3aece80 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java @@ -17,8 +17,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar; +import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.sort.standalone.config.pojo.InlongId; -import org.apache.inlong.sort.standalone.config.pojo.type.DataType; import org.apache.inlong.sort.standalone.utils.Constants; import java.util.Map; @@ -40,7 +40,7 @@ public class PulsarIdConfig { private String uid; private String separator = "|"; private String topic; - private DataType dataType = DataType.TEXT; + private DataTypeEnum dataType = DataTypeEnum.TEXT; /** * Constructor @@ -60,8 +60,8 @@ public class PulsarIdConfig { this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, PulsarIdConfig.DEFAULT_SEPARATOR); this.topic = idParam.getOrDefault(Constants.TOPIC, uid); - this.dataType = DataType - .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, DataType.TEXT.value())); + this.dataType = DataTypeEnum + .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, DataTypeEnum.TEXT.getType())); } /** @@ -159,7 +159,7 @@ public class PulsarIdConfig { * * @return the dataType */ - public DataType getDataType() { + public DataTypeEnum getDataType() { return dataType; } @@ -168,7 +168,7 @@ public class PulsarIdConfig { * * @param dataType the dataType to set */ - public void setDataType(DataType dataType) { + public void setDataType(DataTypeEnum dataType) { this.dataType = dataType; }