This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b020fd2f65 [INLONG-9280][Manager] Support different size of extended fields of InlongStream (#9283) b020fd2f65 is described below commit b020fd2f657f850143c2c8af59cad9be57ecc0a9 Author: vernedeng <verned...@apache.org> AuthorDate: Wed Nov 15 10:46:21 2023 +0800 [INLONG-9280][Manager] Support different size of extended fields of InlongStream (#9283) --- .../inlong/manager/pojo/sink/cls/ClsSinkDTO.java | 9 +++++++++ .../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 2 +- .../pojo/sink/es/ElasticsearchSinkRequest.java | 9 --------- .../manager/pojo/stream/InlongStreamExtParam.java | 3 +++ .../manager/pojo/stream/InlongStreamInfo.java | 3 +++ .../manager/pojo/stream/InlongStreamRequest.java | 3 +++ .../service/core/impl/SortClusterServiceImpl.java | 15 --------------- .../manager/service/sink/cls/ClsSinkOperator.java | 21 ++++++++++++++++----- .../service/sink/es/ElasticsearchSinkOperator.java | 13 ++++++++++++- 9 files changed, 47 insertions(+), 31 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java index f76f5bfb01..87508b5e1c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java @@ -55,6 +55,15 @@ public class ClsSinkDTO { @ApiModelProperty("Cloud log service index tokenizer") private String tokenizer; + @ApiModelProperty("contentOffset") + private Integer contentOffset = 0; + + @ApiModelProperty("fieldOffset") + private Integer fieldOffset; + + @ApiModelProperty("separator") + private String separator; + /** * Get the dto instance from the request */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java index 35565f2cd5..4045231768 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java @@ -44,7 +44,7 @@ public class ElasticsearchSinkDTO { private String indexNamePattern; @ApiModelProperty("contentOffset") - private Integer contentOffset; + private Integer contentOffset = 0; @ApiModelProperty("fieldOffset") private Integer fieldOffset; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java index 182308fe0a..0f8d756c5a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java @@ -40,13 +40,4 @@ public class ElasticsearchSinkRequest extends SinkRequest { @ApiModelProperty("indexNamePattern") private String indexNamePattern; - @ApiModelProperty("contentOffset") - private Integer contentOffset; - - @ApiModelProperty("fieldOffset") - private Integer fieldOffset; - - @ApiModelProperty("separator") - private String separator; - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java index ad69b997c9..2690576aad 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java @@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable { @ApiModelProperty(value = "Predefined fields") private String predefinedFields; + @ApiModelProperty(value = "Extended field size") + private Integer extendedFieldSize = 0; + /** * Pack extended attributes into ExtParams * diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java index f1c305c475..06a441336f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java @@ -136,6 +136,9 @@ public class InlongStreamInfo extends BaseInlongStream { @ApiModelProperty(value = "If use extended fields") private Boolean useExtendedFields = false; + @ApiModelProperty(value = "Extended field size") + private Integer extendedFieldSize = 0; + @ApiModelProperty(value = "Whether to ignore the parse errors of field value") private Boolean ignoreParseError = true; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java index 4ad91f17af..bb6a0d2964 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java @@ -124,6 +124,9 @@ public class InlongStreamRequest extends BaseInlongStream { @ApiModelProperty(value = "If use extended fields") private Boolean useExtendedFields = false; + @ApiModelProperty(value = "Extended field size") + private Integer extendedFieldSize = 0; + @ApiModelProperty(value = "The message body wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, PB, etc") private String wrapType; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java index 170b295249..24ee49f757 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java @@ -20,14 +20,12 @@ package org.apache.inlong.manager.service.core.impl; import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig; import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse; import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig; -import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo; import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo; -import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.core.SortClusterService; import org.apache.inlong.manager.service.core.SortConfigLoader; import org.apache.inlong.manager.service.node.DataNodeOperator; @@ -37,7 +35,6 @@ import org.apache.inlong.manager.service.sink.StreamSinkOperator; import com.google.gson.Gson; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -281,7 +278,6 @@ public class SortClusterServiceImpl implements SortClusterService { StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType()); List<String> fields = fieldMap.get(streamSink.getId()); Map<String, String> params = operator.parse2IdParams(streamSink, fields, dataNodeInfo); - setFiledOffset(streamSink, params); return params; } catch (Exception e) { LOGGER.error("fail to parse id params of groupId={}, streamId={} name={}, type={}}", @@ -294,17 +290,6 @@ public class SortClusterServiceImpl implements SortClusterService { .collect(Collectors.toList()); } - private void setFiledOffset(StreamSinkEntity streamSink, Map<String, String> params) { - - SortSourceStreamInfo sortSourceStreamInfo = allStreams.get(streamSink.getInlongGroupId()) - .get(streamSink.getInlongStreamId()); - InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject( - sortSourceStreamInfo.getExtParams(), InlongStreamExtParam.class); - if (ObjectUtils.anyNotNull(inlongStreamExtParam) && !inlongStreamExtParam.getUseExtendedFields()) { - params.put(FILED_OFFSET, String.valueOf(0)); - } - } - private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) { DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType()); return operator.parse2SinkParams(nodeInfo); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java index a2ec9e1958..cc0dafc91c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java @@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; import org.apache.inlong.manager.pojo.node.DataNodeInfo; @@ -36,6 +37,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.sink.cls.ClsSink; import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO; import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -74,6 +76,15 @@ public class ClsSinkOperator extends AbstractSinkOperator { ClsSinkRequest sinkRequest = (ClsSinkRequest) request; try { ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + + InlongStreamEntity stream = inlongStreamEntityMapper + .selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId()); + dto.setSeparator(String.valueOf((char) (Integer.parseInt(stream.getDataSeparator())))); + + InlongStreamExtParam streamExt = + JsonUtils.parseObject(stream.getExtParams(), InlongStreamExtParam.class); + dto.setFieldOffset(streamExt.getExtendedFieldSize()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, @@ -116,16 +127,16 @@ public class ClsSinkOperator extends AbstractSinkOperator { DataNodeInfo dataNodeInfo) { Map<String, String> params = super.parse2IdParams(streamSink, fields, dataNodeInfo); ClsSinkDTO clsSinkDTO = JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class); - params.put(TOPIC_ID, clsSinkDTO.getTopicId()); + params.computeIfAbsent(TOPIC_ID, k -> clsSinkDTO.getTopicId()); ClsDataNodeInfo clsDataNodeInfo = (ClsDataNodeInfo) dataNodeInfo; - params.put(SECRET_ID, clsDataNodeInfo.getSendSecretId()); - params.put(SECRET_KEY, clsDataNodeInfo.getSendSecretKey()); - params.put(END_POINT, clsDataNodeInfo.getEndpoint()); + params.computeIfAbsent(SECRET_ID, k -> clsDataNodeInfo.getSendSecretId()); + params.computeIfAbsent(SECRET_KEY, k -> clsDataNodeInfo.getSendSecretKey()); + params.computeIfAbsent(END_POINT, k -> clsDataNodeInfo.getEndpoint()); StringBuilder fieldNames = new StringBuilder(); for (String field : fields) { fieldNames.append(field).append(InlongConstants.BLANK); } - params.put(KEY_FIELDS, fieldNames.toString()); + params.computeIfAbsent(KEY_FIELDS, k -> fieldNames.toString()); return params; } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java index fba388029f..7b2109c352 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java @@ -22,6 +22,8 @@ import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; @@ -32,6 +34,7 @@ import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -78,6 +81,14 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator { try { ElasticsearchSinkDTO dto = ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + InlongStreamEntity stream = inlongStreamEntityMapper + .selectByIdentifier(request.getInlongGroupId(), request.getInlongStreamId()); + dto.setSeparator(String.valueOf((char) (Integer.parseInt(stream.getDataSeparator())))); + + InlongStreamExtParam streamExt = + JsonUtils.parseObject(stream.getExtParams(), InlongStreamExtParam.class); + dto.setFieldOffset(streamExt.getExtendedFieldSize()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, @@ -108,7 +119,7 @@ public class ElasticsearchSinkOperator extends AbstractSinkOperator { for (String field : fields) { sb.append(field).append(" "); } - idParams.put(KEY_FIELDS, sb.toString()); + idParams.computeIfAbsent(KEY_FIELDS, k -> sb.toString()); return idParams; }