This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ff41d6aa2b [INLONG-9209][Manager] Support configuring predefined fields and issuing agents (#9210) ff41d6aa2b is described below commit ff41d6aa2bd7e4ec21f5bae53b51b27a0d568d5d Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri Nov 3 16:22:30 2023 +0800 [INLONG-9209][Manager] Support configuring predefined fields and issuing agents (#9210) * [INLONG-9209][Manager] Support configuring predefined fields and issuing agents * [INLONG-9209][Manager] Fix UT --- .../java/org/apache/inlong/common/pojo/agent/DataConfig.java | 1 + .../org/apache/inlong/manager/pojo/stream/BaseInlongStream.java | 3 +++ .../apache/inlong/manager/pojo/stream/InlongStreamExtParam.java | 3 +++ .../inlong/manager/service/core/impl/AgentServiceImpl.java | 9 ++++++++- 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java index 975f74d128..30bbafab5e 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java @@ -48,6 +48,7 @@ public class DataConfig { private Integer syncSend; private String syncPartitionKey; private Integer state; + private String predefinedFields; private String extParams; /** * The task version. diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java index 1327c9ac7b..4abd2f836f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java @@ -20,15 +20,18 @@ package org.apache.inlong.manager.pojo.stream; import io.swagger.annotations.ApiModel; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; /** * The base parameter class of InlongStream, support user extend their own business params. */ @Data @AllArgsConstructor +@NoArgsConstructor @ApiModel("Base info of inlong stream") public class BaseInlongStream { // you can add extend parameters in this class + private String predefinedFields; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java index 0dba8034af..ad69b997c9 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java @@ -48,6 +48,9 @@ public class InlongStreamExtParam implements Serializable { @ApiModelProperty(value = "If use extended fields") private Boolean useExtendedFields = false; + @ApiModelProperty(value = "Predefined fields") + private String predefinedFields; + /** * Pack extended attributes into ExtParams * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 0d4ff83a37..d4731d4e55 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.core.AgentService; import org.apache.inlong.manager.service.source.SourceSnapshotOperator; @@ -101,6 +102,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.inlong.manager.common.consts.InlongConstants.DOT; +import static org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams; /** * Agent service layer implementation @@ -161,7 +163,7 @@ public class AgentServiceImpl implements AgentService { // because the eviction handler needs to query cluster info cache long expireTime = 10 * 5; taskCache = Caffeine.newBuilder() - .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS) + .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS) .build(this::fetchTask); if (updateTaskTimeoutEnabled) { @@ -601,6 +603,11 @@ public class AgentServiceImpl implements AgentService { extParams = (null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams); } + InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new); + // Processing extParams + unpackExtParams(streamEntity.getExtParams(), streamInfo); + dataConfig.setPredefinedFields(streamInfo.getPredefinedFields()); + int dataReportType = groupEntity.getDataReportType(); dataConfig.setDataReportType(dataReportType); if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {