This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new e6ec72c684 [INLONG-10512][Manager] Support preview of data in kv data 
type (#10513)
e6ec72c684 is described below

commit e6ec72c684ae51656014b9e38f83cacc62a1f680
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jun 26 14:07:21 2024 +0800

    [INLONG-10512][Manager] Support preview of data in kv data type (#10513)
    
    * [INLONG-10512][Manager] Support preview of data in kv data type
---
 .../manager/pojo/stream/BaseInlongStream.java      |   2 +
 .../manager/pojo/stream/InlongStreamExtParam.java  |   6 ++
 .../service/datatype/KvDataTypeOperator.java       | 100 +++++++++++++++++++++
 3 files changed, 108 insertions(+)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
index 4abd2f836f..d992b8141f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
@@ -33,5 +33,7 @@ public class BaseInlongStream {
 
     // you can add extend parameters in this class
     private String predefinedFields;
+    private String kvSeparator;
+    private String lineSeparator;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 3358ab91eb..1af56c2769 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -45,6 +45,12 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError;
 
+    @ApiModelProperty(value = "Kv separator")
+    private String kvSeparator;
+
+    @ApiModelProperty(value = "Line separator")
+    private String lineSeparator;
+
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
new file mode 100644
index 0000000000..79177f3d70
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Slf4j
+@Service
+public class KvDataTypeOperator implements DataTypeOperator {
+
+    @Override
+    public boolean accept(DataTypeEnum type) {
+        return DataTypeEnum.KV.equals(type);
+    }
+
+    @Override
+    public List<FieldInfo> parseFields(String str, InlongStreamInfo 
streamInfo) throws Exception {
+        List<FieldInfo> fields = 
CommonBeanUtils.copyListProperties(streamInfo.getFieldList(), FieldInfo::new);
+        try {
+            char separator = 0;
+            if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+                separator = (char) 
Integer.parseInt(streamInfo.getDataSeparator());
+            }
+            char kvSeparator = '=';
+            if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
+                kvSeparator = (char) 
Integer.parseInt(streamInfo.getKvSeparator());
+            }
+            String[] bodys = StringUtils.split(str, separator);
+            if (bodys.length != fields.size()) {
+                log.warn(
+                        "The number of reported fields does not match the 
number of stream fields for groupId={}, streamId={}, reported field size ={}, 
stream field size ={}",
+                        streamInfo.getInlongGroupId(), 
streamInfo.getInlongStreamId(), bodys.length, fields.size());
+                return fields;
+            }
+            for (int i = 0; i < bodys.length; i++) {
+                String body = bodys[i];
+                String[] values = StringUtils.split(body, kvSeparator);
+                fields.get(i).setFieldName(values[0]);
+                fields.get(i).setFieldValue(values[1]);
+            }
+        } catch (Exception e) {
+            log.warn("parse fields failed for groupId = {}, streamId = {}", 
streamInfo.getInlongGroupId(),
+                    streamInfo.getInlongStreamId(), e);
+        }
+        return fields;
+    }
+
+    @Override
+    public DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
+        char separator = 0;
+        if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+            separator = (char) Integer.parseInt(streamInfo.getDataSeparator());
+        }
+        Character escape = null;
+        if (streamInfo.getDataEscapeChar() != null) {
+            escape = streamInfo.getDataEscapeChar().charAt(0);
+        }
+        KvConfig kvConfig = new KvConfig();
+        char kvSeparator = '=';
+        Character lineSeparator = null;
+        if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
+            kvSeparator = (char) Integer.parseInt(streamInfo.getKvSeparator());
+        }
+        // row separator, which must be a field separator in the data flow
+        if (StringUtils.isNotBlank(streamInfo.getLineSeparator())) {
+            lineSeparator = (char) 
Integer.parseInt(streamInfo.getLineSeparator());
+        }
+        kvConfig.setLineSeparator(lineSeparator);
+        kvConfig.setKvSplitter(kvSeparator);
+        kvConfig.setEntrySplitter(separator);
+        kvConfig.setEscapeChar(escape);
+        return kvConfig;
+    }
+}

Reply via email to