This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d51687fac [INLONG-5615][Manager] Add template id and sub task status 
for file source (#5616)
d51687fac is described below

commit d51687facde131a0a0e96af9fff8951d7e9a1fc1
Author: woofyzhao <490467...@qq.com>
AuthorDate: Mon Aug 22 19:09:34 2022 +0800

    [INLONG-5615][Manager] Add template id and sub task status for file source 
(#5616)
---
 .../inlong/manager/client/cli/ListCommand.java     |  2 +-
 .../inlong/manager/client/cli/util/PrintUtils.java |  2 +-
 .../manager/client/cli/validator/GroupStatus.java  |  2 +-
 .../inlong/manager/client/api/InlongClient.java    |  4 +-
 .../manager/client/api/InlongGroupContext.java     |  4 +-
 .../manager/client/api/impl/InlongClientImpl.java  | 16 ++++--
 .../manager/client/api/impl/InlongGroupImpl.java   |  2 +-
 .../client/api/inner/client/InlongGroupClient.java |  2 +-
 .../manager/common}/enums/SimpleGroupStatus.java   |  4 +-
 .../manager/common}/enums/SimpleSourceStatus.java  |  4 +-
 .../manager/dao/entity/StreamSourceEntity.java     |  1 +
 .../dao/mapper/StreamSourceEntityMapper.java       |  5 ++
 .../resources/mappers/StreamSourceEntityMapper.xml | 19 +++++--
 .../manager/pojo/group/InlongGroupStatusInfo.java  | 53 ++++++++++++++++++
 .../inlong/manager/pojo/source/StreamSource.java   |  4 ++
 .../inlong/manager/pojo/source/SubSourceDTO.java   | 64 ++++++++++++++++++++++
 .../service/core/impl/AgentServiceImpl.java        |  4 +-
 .../service/source/file/FileSourceOperator.java    | 14 +++++
 .../main/resources/h2/apache_inlong_manager.sql    |  1 +
 .../manager-web/sql/apache_inlong_manager.sql      |  1 +
 20 files changed, 183 insertions(+), 25 deletions(-)

diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index 84e239fab..84a259a96 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
 import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
index 945abaa43..0627e732f 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 
 import java.lang.reflect.Field;
 import java.text.SimpleDateFormat;
diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
index 70b72aa78..9de4bb46a 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli.validator;
 import com.beust.jcommander.IParameterValidator;
 import com.beust.jcommander.ParameterException;
 import org.apache.commons.lang3.EnumUtils;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 
 /**
  * Class for inlong group status verification.
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index e1c26e971..801b66523 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.client.api;
 
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -30,6 +29,7 @@ import 
org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
 
 import java.util.List;
 import java.util.Map;
@@ -101,7 +101,7 @@ public interface InlongClient {
      * @return map of inlong group status list
      * @throws Exception the exception
      */
-    Map<String, SimpleGroupStatus> listGroupStatus(List<String> groupIds) 
throws Exception;
+    Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds) 
throws Exception;
 
     /**
      * Gets group.
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 14fc38c76..c930d4846 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -23,8 +23,8 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 9ab335cb0..c373bfb11 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -29,8 +29,8 @@ import 
org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongCluster;
 import org.apache.inlong.manager.client.api.InlongGroup;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleSourceStatus;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
@@ -47,6 +47,7 @@ import 
org.apache.inlong.manager.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -121,21 +122,26 @@ public class InlongClientImpl implements InlongClient {
     }
 
     @Override
-    public Map<String, SimpleGroupStatus> listGroupStatus(List<String> 
groupIds) {
+    public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> 
groupIds) {
         InlongGroupPageRequest request = new InlongGroupPageRequest();
         request.setGroupIdList(groupIds);
         request.setListSources(true);
 
         PageInfo<InlongGroupBriefInfo> pageInfo = 
groupClient.listGroups(request);
         List<InlongGroupBriefInfo> briefInfos = pageInfo.getList();
-        Map<String, SimpleGroupStatus> groupStatusMap = Maps.newHashMap();
+        Map<String, InlongGroupStatusInfo> groupStatusMap = Maps.newHashMap();
         if (CollectionUtils.isNotEmpty(briefInfos)) {
             briefInfos.forEach(briefInfo -> {
                 String groupId = briefInfo.getInlongGroupId();
                 SimpleGroupStatus groupStatus = 
SimpleGroupStatus.parseStatusByCode(briefInfo.getStatus());
                 List<StreamSource> sources = briefInfo.getStreamSources();
                 groupStatus = recheckGroupStatus(groupStatus, sources);
-                groupStatusMap.put(groupId, groupStatus);
+                InlongGroupStatusInfo statusInfo = 
InlongGroupStatusInfo.builder()
+                        .inlongGroupId(briefInfo.getInlongGroupId())
+                        .originalStatus(briefInfo.getStatus())
+                        .simpleGroupStatus(groupStatus)
+                        .streamSources(sources).build();
+                groupStatusMap.put(groupId, statusInfo);
             });
         }
         return groupStatusMap;
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 857680fff..403ff81b2 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index f5569ad50..148e9b8cc 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -21,7 +21,7 @@ import com.github.pagehelper.PageInfo;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.service.InlongGroupApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.pojo.common.Response;
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
similarity index 97%
rename from 
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java
rename to 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
index 63a017488..29edeec89 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api.enums;
-
-import org.apache.inlong.manager.common.enums.GroupStatus;
+package org.apache.inlong.manager.common.enums;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
similarity index 94%
rename from 
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java
rename to 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
index 53ed23e36..175b3ce49 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.client.api.enums;
-
-import org.apache.inlong.manager.common.enums.SourceStatus;
+package org.apache.inlong.manager.common.enums;
 
 /**
  * The simple stream source status, more readable for users
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 5a0047c36..2d7a1aa9d 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -34,6 +34,7 @@ public class StreamSourceEntity implements Serializable {
     private String inlongStreamId;
     private String sourceType;
     private String sourceName;
+    private Integer templateId;
     private String agentIp;
     private String uuid;
 
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index a55c26f4d..eabd61220 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -93,6 +93,11 @@ public interface StreamSourceEntityMapper {
      */
     List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList") 
List<String> groupIdList);
 
+    /**
+     * Select all sub sources by template id
+     */
+    List<StreamSourceEntity> selectByTemplateId(@Param("templateId") Integer 
templateId);
+
     /**
      * Get the distinct source type from the given groupId and streamId
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 6c5d36be9..32f8c56ef 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -26,6 +26,7 @@
         <result column="inlong_stream_id" jdbcType="VARCHAR" 
property="inlongStreamId"/>
         <result column="source_type" jdbcType="VARCHAR" property="sourceType"/>
         <result column="source_name" jdbcType="VARCHAR" property="sourceName"/>
+        <result column="template_id" jdbcType="INTEGER" property="templateId"/>
         <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
         <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
         <result column="data_node_name" jdbcType="VARCHAR" 
property="dataNodeName"/>
@@ -44,7 +45,7 @@
         <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, inlong_group_id, inlong_stream_id, source_type, source_name, 
agent_ip, uuid,
+        id, inlong_group_id, inlong_stream_id, source_type, source_name, 
template_id, agent_ip, uuid,
         data_node_name, inlong_cluster_name, serialization_type, snapshot, 
report_time, ext_params,
         version, status, previous_status, is_deleted, creator, modifier, 
create_time, modify_time
     </sql>
@@ -52,14 +53,15 @@
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         insert into stream_source (inlong_group_id, inlong_stream_id,
-                                   source_type, source_name, agent_ip,
+                                   source_type, source_name, template_id, 
agent_ip,
                                    uuid, data_node_name, inlong_cluster_name,
                                    serialization_type, snapshot,
                                    report_time, ext_params, status,
                                    previous_status, creator, modifier)
         values (#{inlongGroupId,jdbcType=VARCHAR}, 
#{inlongStreamId,jdbcType=VARCHAR},
-                #{sourceType,jdbcType=VARCHAR}, 
#{sourceName,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR},
-                #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, 
#{inlongClusterName,jdbcType=VARCHAR},
+                #{sourceType,jdbcType=VARCHAR}, 
#{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
+                #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR},
+                #{dataNodeName,jdbcType=VARCHAR}, 
#{inlongClusterName,jdbcType=VARCHAR},
                 #{serializationType,jdbcType=VARCHAR}, 
#{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, 
#{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
                 #{previousStatus,jdbcType=INTEGER}, 
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
@@ -72,6 +74,13 @@
         where id = #{id,jdbcType=INTEGER}
         and is_deleted = 0
     </select>
+    <select id="selectByTemplateId" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        where template_id = #{templateId,jdbcType=INTEGER}
+        and is_deleted = 0
+    </select>
     <select id="selectByIdForUpdate" 
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
@@ -125,6 +134,7 @@
             <if test="request.status != null and request.status != ''">
                 and status = #{request.status, jdbcType=INTEGER}
             </if>
+            and template_id is NULL
         </where>
         <choose>
             <when test="request.orderField != null and request.orderField != 
'' and request.orderType != null and request.orderType != ''">
@@ -244,6 +254,7 @@
                     #{item}
                 </foreach>
             </if>
+            and template_id is NULL
         </where>
     </select>
     <select id="selectSourceType" resultType="java.lang.String">
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
new file mode 100644
index 000000000..c0cf4e678
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import java.util.List;
+
+/**
+ * Inlong group status info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong group status info")
+public class InlongGroupStatusInfo {
+
+    @ApiModelProperty(value = "Inlong group id")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Inlong group original status")
+    private Integer originalStatus;
+
+    @ApiModelProperty(value = "Inlong group simple status")
+    private SimpleGroupStatus simpleGroupStatus;
+
+    @ApiModelProperty(value = "Stream sources in the inlong group")
+    private List<StreamSource> streamSources;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 00a6a9d28..9c07d7969 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.stream.StreamNode;
 
 import java.util.Date;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -103,6 +104,9 @@ public abstract class StreamSource extends StreamNode {
     @Builder.Default
     private Map<String, Object> properties = new LinkedHashMap<>();
 
+    @ApiModelProperty("Sub source information of existing agents")
+    private List<SubSourceDTO> subSourceList;
+
     public SourceRequest genSourceRequest() {
         return null;
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
new file mode 100644
index 000000000..372f0d71a
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sub source information data per agent
+ */
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class SubSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("stream source id")
+    private Integer id;
+
+    @ApiModelProperty("Template source id this sub source belongs to")
+    private Integer templateId;
+
+    @ApiModelProperty("Agent ip of sub source")
+    private String agentIp;
+
+    @ApiModelProperty("Status of sub source")
+    private Integer status;
+
+    public static SubSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+            return OBJECT_MAPPER.readValue(extParams, SubSourceDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 3b31262b4..94ec64d61 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -237,7 +237,8 @@ public class AgentServiceImpl implements AgentService {
 
             // Cluster name is not blank, split task if necessary
             // The agent ip field of the entity holds the ip list of the 
agents that has already been issued
-            if (StringUtils.isNotBlank(destClusterName) && 
destClusterName.equals(agentClusterName)) {
+            if (StringUtils.isNotBlank(destClusterName) && 
destClusterName.equals(agentClusterName)
+                    && Objects.isNull(sourceEntity.getTemplateId())) {
 
                 // Is the task already fetched by this agent ?
                 if (StringUtils.isNotBlank(sourceEntity.getAgentIp())) {
@@ -256,6 +257,7 @@ public class AgentServiceImpl implements AgentService {
                 fileEntity.setAgentIp(agentIp);
                 fileEntity.setUuid(uuid);
                 fileEntity.setSourceName(fileEntity.getSourceName() + "-" + 
RandomStringUtils.randomAlphanumeric(10));
+                fileEntity.setTemplateId(sourceEntity.getId());
                 int op = getOp(fileEntity.getStatus());
                 int nextStatus = getNextStatus(fileEntity.getStatus());
                 fileEntity.setStatus(nextStatus);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 0c2e5c23a..1eaa99740 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -23,17 +23,20 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.file.FileSource;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceRequest;
+import org.apache.inlong.manager.pojo.source.SubSourceDTO;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * File source operator, such as get or set file source info.
@@ -44,6 +47,9 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
     @Autowired
     private ObjectMapper objectMapper;
 
+    @Autowired
+    private StreamSourceEntityMapper sourceMapper;
+
     @Override
     public Boolean accept(String sourceType) {
         return SourceType.FILE.equals(sourceType);
@@ -79,6 +85,14 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
 
         List<StreamField> sourceFields = super.getSourceFields(entity.getId());
         source.setFieldList(sourceFields);
+
+        List<StreamSourceEntity> subSourceList = 
sourceMapper.selectByTemplateId(entity.getId());
+        source.setSubSourceList(subSourceList.stream().map(subEntity -> 
SubSourceDTO.builder()
+                        .id(subEntity.getId())
+                        .templateId(entity.getId())
+                        .agentIp(subEntity.getAgentIp())
+                        .status(subEntity.getStatus()).build())
+                .collect(Collectors.toList()));
         return source;
     }
 
diff --git 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 53662cf3f..2d7753dfb 100644
--- 
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ 
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -381,6 +381,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `inlong_stream_id`    varchar(256) NOT NULL COMMENT 'Inlong stream id',
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 
'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source 
type, including: FILE, DB, etc',
+    `template_id`         int(11)      DEFAULT NULL COMMENT 'Id of the 
template task this agent belongs to',
     `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of 
the agent running the task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid 
of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node 
name, which links to data_node table',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql 
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 1ae2850fa..0c249a2ff 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -402,6 +402,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `inlong_stream_id`    varchar(256) NOT NULL COMMENT 'Inlong stream id',
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 
'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source 
type, including: FILE, DB, etc',
+    `template_id`         int(11)      DEFAULT NULL COMMENT 'Id of the 
template task this agent belongs to',
     `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of 
the agent running the task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid 
of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node 
name, which links to data_node table',

Reply via email to