This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b020fd2f65 [INLONG-9280][Manager] Support different size of extended 
fields of InlongStream (#9283)
b020fd2f65 is described below

commit b020fd2f657f850143c2c8af59cad9be57ecc0a9
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Nov 15 10:46:21 2023 +0800

    [INLONG-9280][Manager] Support different size of extended fields of 
InlongStream (#9283)
---
 .../inlong/manager/pojo/sink/cls/ClsSinkDTO.java    |  9 +++++++++
 .../manager/pojo/sink/es/ElasticsearchSinkDTO.java  |  2 +-
 .../pojo/sink/es/ElasticsearchSinkRequest.java      |  9 ---------
 .../manager/pojo/stream/InlongStreamExtParam.java   |  3 +++
 .../manager/pojo/stream/InlongStreamInfo.java       |  3 +++
 .../manager/pojo/stream/InlongStreamRequest.java    |  3 +++
 .../service/core/impl/SortClusterServiceImpl.java   | 15 ---------------
 .../manager/service/sink/cls/ClsSinkOperator.java   | 21 ++++++++++++++++-----
 .../service/sink/es/ElasticsearchSinkOperator.java  | 13 ++++++++++++-
 9 files changed, 47 insertions(+), 31 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
index f76f5bfb01..87508b5e1c 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
@@ -55,6 +55,15 @@ public class ClsSinkDTO {
     @ApiModelProperty("Cloud log service index tokenizer")
     private String tokenizer;
 
+    @ApiModelProperty("contentOffset")
+    private Integer contentOffset = 0;
+
+    @ApiModelProperty("fieldOffset")
+    private Integer fieldOffset;
+
+    @ApiModelProperty("separator")
+    private String separator;
+
     /**
      * Get the dto instance from the request
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 35565f2cd5..4045231768 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -44,7 +44,7 @@ public class ElasticsearchSinkDTO {
     private String indexNamePattern;
 
     @ApiModelProperty("contentOffset")
-    private Integer contentOffset;
+    private Integer contentOffset = 0;
 
     @ApiModelProperty("fieldOffset")
     private Integer fieldOffset;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
index 182308fe0a..0f8d756c5a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
@@ -40,13 +40,4 @@ public class ElasticsearchSinkRequest extends SinkRequest {
     @ApiModelProperty("indexNamePattern")
     private String indexNamePattern;
 
-    @ApiModelProperty("contentOffset")
-    private Integer contentOffset;
-
-    @ApiModelProperty("fieldOffset")
-    private Integer fieldOffset;
-
-    @ApiModelProperty("separator")
-    private String separator;
-
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index ad69b997c9..2690576aad 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "Predefined fields")
     private String predefinedFields;
 
+    @ApiModelProperty(value = "Extended field size")
+    private Integer extendedFieldSize = 0;
+
     /**
      * Pack extended attributes into ExtParams
      *
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index f1c305c475..06a441336f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -136,6 +136,9 @@ public class InlongStreamInfo extends BaseInlongStream {
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
+    @ApiModelProperty(value = "Extended field size")
+    private Integer extendedFieldSize = 0;
+
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value")
     private Boolean ignoreParseError = true;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index 4ad91f17af..bb6a0d2964 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -124,6 +124,9 @@ public class InlongStreamRequest extends BaseInlongStream {
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
+    @ApiModelProperty(value = "Extended field size")
+    private Integer extendedFieldSize = 0;
+
     @ApiModelProperty(value = "The message body  wrap type, including: RAW, 
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
     private String wrapType;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index 170b295249..24ee49f757 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -20,14 +20,12 @@ package org.apache.inlong.manager.service.core.impl;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
 import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
 import org.apache.inlong.manager.service.core.SortClusterService;
 import org.apache.inlong.manager.service.core.SortConfigLoader;
 import org.apache.inlong.manager.service.node.DataNodeOperator;
@@ -37,7 +35,6 @@ import 
org.apache.inlong.manager.service.sink.StreamSinkOperator;
 
 import com.google.gson.Gson;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -281,7 +278,6 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                         StreamSinkOperator operator = 
sinkOperatorFactory.getInstance(streamSink.getSinkType());
                         List<String> fields = fieldMap.get(streamSink.getId());
                         Map<String, String> params = 
operator.parse2IdParams(streamSink, fields, dataNodeInfo);
-                        setFiledOffset(streamSink, params);
                         return params;
                     } catch (Exception e) {
                         LOGGER.error("fail to parse id params of groupId={}, 
streamId={} name={}, type={}}",
@@ -294,17 +290,6 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 .collect(Collectors.toList());
     }
 
-    private void setFiledOffset(StreamSinkEntity streamSink, Map<String, 
String> params) {
-
-        SortSourceStreamInfo sortSourceStreamInfo = 
allStreams.get(streamSink.getInlongGroupId())
-                .get(streamSink.getInlongStreamId());
-        InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject(
-                sortSourceStreamInfo.getExtParams(), 
InlongStreamExtParam.class);
-        if (ObjectUtils.anyNotNull(inlongStreamExtParam) && 
!inlongStreamExtParam.getUseExtendedFields()) {
-            params.put(FILED_OFFSET, String.valueOf(0));
-        }
-    }
-
     private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
         DataNodeOperator operator = 
dataNodeOperatorFactory.getInstance(nodeInfo.getType());
         return operator.parse2SinkParams(nodeInfo);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index a2ec9e1958..cc0dafc91c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@@ -36,6 +37,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.cls.ClsSink;
 import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
 import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -74,6 +76,15 @@ public class ClsSinkOperator extends AbstractSinkOperator {
         ClsSinkRequest sinkRequest = (ClsSinkRequest) request;
         try {
             ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest, 
targetEntity.getExtParams());
+
+            InlongStreamEntity stream = inlongStreamEntityMapper
+                    .selectByIdentifier(request.getInlongGroupId(), 
request.getInlongStreamId());
+            dto.setSeparator(String.valueOf((char) 
(Integer.parseInt(stream.getDataSeparator()))));
+
+            InlongStreamExtParam streamExt =
+                    JsonUtils.parseObject(stream.getExtParams(), 
InlongStreamExtParam.class);
+            dto.setFieldOffset(streamExt.getExtendedFieldSize());
+
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
@@ -116,16 +127,16 @@ public class ClsSinkOperator extends AbstractSinkOperator 
{
             DataNodeInfo dataNodeInfo) {
         Map<String, String> params = super.parse2IdParams(streamSink, fields, 
dataNodeInfo);
         ClsSinkDTO clsSinkDTO = 
JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class);
-        params.put(TOPIC_ID, clsSinkDTO.getTopicId());
+        params.computeIfAbsent(TOPIC_ID, k -> clsSinkDTO.getTopicId());
         ClsDataNodeInfo clsDataNodeInfo = (ClsDataNodeInfo) dataNodeInfo;
-        params.put(SECRET_ID, clsDataNodeInfo.getSendSecretId());
-        params.put(SECRET_KEY, clsDataNodeInfo.getSendSecretKey());
-        params.put(END_POINT, clsDataNodeInfo.getEndpoint());
+        params.computeIfAbsent(SECRET_ID, k -> 
clsDataNodeInfo.getSendSecretId());
+        params.computeIfAbsent(SECRET_KEY, k -> 
clsDataNodeInfo.getSendSecretKey());
+        params.computeIfAbsent(END_POINT, k -> clsDataNodeInfo.getEndpoint());
         StringBuilder fieldNames = new StringBuilder();
         for (String field : fields) {
             fieldNames.append(field).append(InlongConstants.BLANK);
         }
-        params.put(KEY_FIELDS, fieldNames.toString());
+        params.computeIfAbsent(KEY_FIELDS, k -> fieldNames.toString());
         return params;
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index fba388029f..7b2109c352 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -22,6 +22,8 @@ import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@@ -32,6 +34,7 @@ import 
org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -78,6 +81,14 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
         try {
             ElasticsearchSinkDTO dto = 
ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());
 
+            InlongStreamEntity stream = inlongStreamEntityMapper
+                    .selectByIdentifier(request.getInlongGroupId(), 
request.getInlongStreamId());
+            dto.setSeparator(String.valueOf((char) 
(Integer.parseInt(stream.getDataSeparator()))));
+
+            InlongStreamExtParam streamExt =
+                    JsonUtils.parseObject(stream.getExtParams(), 
InlongStreamExtParam.class);
+            dto.setFieldOffset(streamExt.getExtendedFieldSize());
+
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
@@ -108,7 +119,7 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
         for (String field : fields) {
             sb.append(field).append(" ");
         }
-        idParams.put(KEY_FIELDS, sb.toString());
+        idParams.computeIfAbsent(KEY_FIELDS, k -> sb.toString());
         return idParams;
     }
 

Reply via email to