This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ff41d6aa2b [INLONG-9209][Manager] Support configuring predefined 
fields and issuing agents (#9210)
ff41d6aa2b is described below

commit ff41d6aa2bd7e4ec21f5bae53b51b27a0d568d5d
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Fri Nov 3 16:22:30 2023 +0800

    [INLONG-9209][Manager] Support configuring predefined fields and issuing 
agents (#9210)
    
    * [INLONG-9209][Manager] Support configuring predefined fields and issuing 
agents
    
    * [INLONG-9209][Manager] Fix UT
---
 .../java/org/apache/inlong/common/pojo/agent/DataConfig.java     | 1 +
 .../org/apache/inlong/manager/pojo/stream/BaseInlongStream.java  | 3 +++
 .../apache/inlong/manager/pojo/stream/InlongStreamExtParam.java  | 3 +++
 .../inlong/manager/service/core/impl/AgentServiceImpl.java       | 9 ++++++++-
 4 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index 975f74d128..30bbafab5e 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -48,6 +48,7 @@ public class DataConfig {
     private Integer syncSend;
     private String syncPartitionKey;
     private Integer state;
+    private String predefinedFields;
     private String extParams;
     /**
      * The task version.
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
index 1327c9ac7b..4abd2f836f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
@@ -20,15 +20,18 @@ package org.apache.inlong.manager.pojo.stream;
 import io.swagger.annotations.ApiModel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 /**
  * The base parameter class of InlongStream, support user extend their own 
business params.
  */
 @Data
 @AllArgsConstructor
+@NoArgsConstructor
 @ApiModel("Base info of inlong stream")
 public class BaseInlongStream {
 
     // you can add extend parameters in this class
+    private String predefinedFields;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 0dba8034af..ad69b997c9 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -48,6 +48,9 @@ public class InlongStreamExtParam implements Serializable {
     @ApiModelProperty(value = "If use extended fields")
     private Boolean useExtendedFields = false;
 
+    @ApiModelProperty(value = "Predefined fields")
+    private String predefinedFields;
+
     /**
      * Pack extended attributes into ExtParams
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0d4ff83a37..d4731d4e55 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -55,6 +55,7 @@ import 
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
 
@@ -101,6 +102,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
+import static 
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
 
 /**
  * Agent service layer implementation
@@ -161,7 +163,7 @@ public class AgentServiceImpl implements AgentService {
         // because the eviction handler needs to query cluster info cache
         long expireTime = 10 * 5;
         taskCache = Caffeine.newBuilder()
-                .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
+                .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
                 .build(this::fetchTask);
 
         if (updateTaskTimeoutEnabled) {
@@ -601,6 +603,11 @@ public class AgentServiceImpl implements AgentService {
                 extParams = (null != dataSeparator ? getExtParams(extParams, 
dataSeparator) : extParams);
             }
 
+            InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+            // Processing extParams
+            unpackExtParams(streamEntity.getExtParams(), streamInfo);
+            dataConfig.setPredefinedFields(streamInfo.getPredefinedFields());
+
             int dataReportType = groupEntity.getDataReportType();
             dataConfig.setDataReportType(dataReportType);
             if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {

Reply via email to