This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d51687fac [INLONG-5615][Manager] Add template id and sub task status for file source (#5616) d51687fac is described below commit d51687facde131a0a0e96af9fff8951d7e9a1fc1 Author: woofyzhao <490467...@qq.com> AuthorDate: Mon Aug 22 19:09:34 2022 +0800 [INLONG-5615][Manager] Add template id and sub task status for file source (#5616) --- .../inlong/manager/client/cli/ListCommand.java | 2 +- .../inlong/manager/client/cli/util/PrintUtils.java | 2 +- .../manager/client/cli/validator/GroupStatus.java | 2 +- .../inlong/manager/client/api/InlongClient.java | 4 +- .../manager/client/api/InlongGroupContext.java | 4 +- .../manager/client/api/impl/InlongClientImpl.java | 16 ++++-- .../manager/client/api/impl/InlongGroupImpl.java | 2 +- .../client/api/inner/client/InlongGroupClient.java | 2 +- .../manager/common}/enums/SimpleGroupStatus.java | 4 +- .../manager/common}/enums/SimpleSourceStatus.java | 4 +- .../manager/dao/entity/StreamSourceEntity.java | 1 + .../dao/mapper/StreamSourceEntityMapper.java | 5 ++ .../resources/mappers/StreamSourceEntityMapper.xml | 19 +++++-- .../manager/pojo/group/InlongGroupStatusInfo.java | 53 ++++++++++++++++++ .../inlong/manager/pojo/source/StreamSource.java | 4 ++ .../inlong/manager/pojo/source/SubSourceDTO.java | 64 ++++++++++++++++++++++ .../service/core/impl/AgentServiceImpl.java | 4 +- .../service/source/file/FileSourceOperator.java | 14 +++++ .../main/resources/h2/apache_inlong_manager.sql | 1 + .../manager-web/sql/apache_inlong_manager.sql | 1 + 20 files changed, 183 insertions(+), 25 deletions(-) diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java index 84e239fab..84a259a96 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java @@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.github.pagehelper.PageInfo; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient; import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient; import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient; diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java index 945abaa43..0627e732f 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import java.lang.reflect.Field; import java.text.SimpleDateFormat; diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java index 70b72aa78..9de4bb46a 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java @@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli.validator; import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.ParameterException; import org.apache.commons.lang3.EnumUtils; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; /** * Class for inlong group status verification. diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java index e1c26e971..801b66523 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java @@ -18,7 +18,6 @@ package org.apache.inlong.manager.client.api; import com.github.pagehelper.PageInfo; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; import org.apache.inlong.manager.client.api.impl.InlongClientImpl; import org.apache.inlong.manager.pojo.cluster.BindTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; @@ -30,6 +29,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagPageRequest; import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest; import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo; import java.util.List; import java.util.Map; @@ -101,7 +101,7 @@ public interface InlongClient { * @return map of inlong group status list * @throws Exception the exception */ - Map<String, SimpleGroupStatus> listGroupStatus(List<String> groupIds) throws Exception; + Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds) throws Exception; /** * Gets group. diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java index 14fc38c76..c930d4846 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java @@ -23,8 +23,8 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; -import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleSourceStatus; import org.apache.inlong.manager.client.api.inner.InnerGroupContext; import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java index 9ab335cb0..c373bfb11 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java @@ -29,8 +29,8 @@ import org.apache.inlong.manager.client.api.ClientConfiguration; import org.apache.inlong.manager.client.api.InlongClient; import org.apache.inlong.manager.client.api.InlongCluster; import org.apache.inlong.manager.client.api.InlongGroup; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; -import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleSourceStatus; import org.apache.inlong.manager.client.api.inner.client.ClientFactory; import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient; import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient; @@ -47,6 +47,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterTagResponse; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest; +import org.apache.inlong.manager.pojo.group.InlongGroupStatusInfo; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.common.util.HttpUtils; import org.apache.inlong.manager.common.util.Preconditions; @@ -121,21 +122,26 @@ public class InlongClientImpl implements InlongClient { } @Override - public Map<String, SimpleGroupStatus> listGroupStatus(List<String> groupIds) { + public Map<String, InlongGroupStatusInfo> listGroupStatus(List<String> groupIds) { InlongGroupPageRequest request = new InlongGroupPageRequest(); request.setGroupIdList(groupIds); request.setListSources(true); PageInfo<InlongGroupBriefInfo> pageInfo = groupClient.listGroups(request); List<InlongGroupBriefInfo> briefInfos = pageInfo.getList(); - Map<String, SimpleGroupStatus> groupStatusMap = Maps.newHashMap(); + Map<String, InlongGroupStatusInfo> groupStatusMap = Maps.newHashMap(); if (CollectionUtils.isNotEmpty(briefInfos)) { briefInfos.forEach(briefInfo -> { String groupId = briefInfo.getInlongGroupId(); SimpleGroupStatus groupStatus = SimpleGroupStatus.parseStatusByCode(briefInfo.getStatus()); List<StreamSource> sources = briefInfo.getStreamSources(); groupStatus = recheckGroupStatus(groupStatus, sources); - groupStatusMap.put(groupId, groupStatus); + InlongGroupStatusInfo statusInfo = InlongGroupStatusInfo.builder() + .inlongGroupId(briefInfo.getInlongGroupId()) + .originalStatus(briefInfo.getStatus()) + .simpleGroupStatus(groupStatus) + .streamSources(sources).build(); + groupStatusMap.put(groupId, statusInfo); }); } return groupStatusMap; diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java index 857680fff..403ff81b2 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java @@ -24,7 +24,7 @@ import org.apache.inlong.manager.client.api.InlongGroup; import org.apache.inlong.manager.client.api.InlongGroupContext; import org.apache.inlong.manager.client.api.InlongStream; import org.apache.inlong.manager.client.api.InlongStreamBuilder; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import org.apache.inlong.manager.client.api.inner.InnerGroupContext; import org.apache.inlong.manager.client.api.inner.client.ClientFactory; import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient; diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java index f5569ad50..148e9b8cc 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java @@ -21,7 +21,7 @@ import com.github.pagehelper.PageInfo; import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.apache.inlong.manager.client.api.ClientConfiguration; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import org.apache.inlong.manager.client.api.service.InlongGroupApi; import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.pojo.common.Response; diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java similarity index 97% rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java index 63a017488..29edeec89 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleGroupStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java @@ -15,9 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.client.api.enums; - -import org.apache.inlong.manager.common.enums.GroupStatus; +package org.apache.inlong.manager.common.enums; import java.util.ArrayList; import java.util.List; diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java similarity index 94% rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java index 53ed23e36..175b3ce49 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/enums/SimpleSourceStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleSourceStatus.java @@ -15,9 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.client.api.enums; - -import org.apache.inlong.manager.common.enums.SourceStatus; +package org.apache.inlong.manager.common.enums; /** * The simple stream source status, more readable for users diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java index 5a0047c36..2d7a1aa9d 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java @@ -34,6 +34,7 @@ public class StreamSourceEntity implements Serializable { private String inlongStreamId; private String sourceType; private String sourceName; + private Integer templateId; private String agentIp; private String uuid; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index a55c26f4d..eabd61220 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -93,6 +93,11 @@ public interface StreamSourceEntityMapper { */ List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList") List<String> groupIdList); + /** + * Select all sub sources by template id + */ + List<StreamSourceEntity> selectByTemplateId(@Param("templateId") Integer templateId); + /** * Get the distinct source type from the given groupId and streamId */ diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 6c5d36be9..32f8c56ef 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -26,6 +26,7 @@ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/> <result column="source_type" jdbcType="VARCHAR" property="sourceType"/> <result column="source_name" jdbcType="VARCHAR" property="sourceName"/> + <result column="template_id" jdbcType="INTEGER" property="templateId"/> <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/> <result column="uuid" jdbcType="VARCHAR" property="uuid"/> <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/> @@ -44,7 +45,7 @@ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/> </resultMap> <sql id="Base_Column_List"> - id, inlong_group_id, inlong_stream_id, source_type, source_name, agent_ip, uuid, + id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid, data_node_name, inlong_cluster_name, serialization_type, snapshot, report_time, ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time </sql> @@ -52,14 +53,15 @@ <insert id="insert" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> insert into stream_source (inlong_group_id, inlong_stream_id, - source_type, source_name, agent_ip, + source_type, source_name, template_id, agent_ip, uuid, data_node_name, inlong_cluster_name, serialization_type, snapshot, report_time, ext_params, status, previous_status, creator, modifier) values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, - #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR}, - #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR}, + #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER}, + #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR}, + #{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR}, #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}) @@ -72,6 +74,13 @@ where id = #{id,jdbcType=INTEGER} and is_deleted = 0 </select> + <select id="selectByTemplateId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> + select + <include refid="Base_Column_List"/> + from stream_source + where template_id = #{templateId,jdbcType=INTEGER} + and is_deleted = 0 + </select> <select id="selectByIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> select <include refid="Base_Column_List"/> @@ -125,6 +134,7 @@ <if test="request.status != null and request.status != ''"> and status = #{request.status, jdbcType=INTEGER} </if> + and template_id is NULL </where> <choose> <when test="request.orderField != null and request.orderField != '' and request.orderType != null and request.orderType != ''"> @@ -244,6 +254,7 @@ #{item} </foreach> </if> + and template_id is NULL </where> </select> <select id="selectSourceType" resultType="java.lang.String"> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java new file mode 100644 index 000000000..c0cf4e678 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupStatusInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.group; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; +import org.apache.inlong.manager.pojo.source.StreamSource; + +import java.util.List; + +/** + * Inlong group status info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Inlong group status info") +public class InlongGroupStatusInfo { + + @ApiModelProperty(value = "Inlong group id") + private String inlongGroupId; + + @ApiModelProperty(value = "Inlong group original status") + private Integer originalStatus; + + @ApiModelProperty(value = "Inlong group simple status") + private SimpleGroupStatus simpleGroupStatus; + + @ApiModelProperty(value = "Stream sources in the inlong group") + private List<StreamSource> streamSources; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java index 00a6a9d28..9c07d7969 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java @@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.stream.StreamNode; import java.util.Date; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; /** @@ -103,6 +104,9 @@ public abstract class StreamSource extends StreamNode { @Builder.Default private Map<String, Object> properties = new LinkedHashMap<>(); + @ApiModelProperty("Sub source information of existing agents") + private List<SubSourceDTO> subSourceList; + public SourceRequest genSourceRequest() { return null; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java new file mode 100644 index 000000000..372f0d71a --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import javax.validation.constraints.NotNull; + +/** + * Sub source information data per agent + */ +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Data +public class SubSourceDTO { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @ApiModelProperty("stream source id") + private Integer id; + + @ApiModelProperty("Template source id this sub source belongs to") + private Integer templateId; + + @ApiModelProperty("Agent ip of sub source") + private String agentIp; + + @ApiModelProperty("Status of sub source") + private Integer status; + + public static SubSourceDTO getFromJson(@NotNull String extParams) { + try { + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return OBJECT_MAPPER.readValue(extParams, SubSourceDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 3b31262b4..94ec64d61 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -237,7 +237,8 @@ public class AgentServiceImpl implements AgentService { // Cluster name is not blank, split task if necessary // The agent ip field of the entity holds the ip list of the agents that has already been issued - if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName)) { + if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName) + && Objects.isNull(sourceEntity.getTemplateId())) { // Is the task already fetched by this agent ? if (StringUtils.isNotBlank(sourceEntity.getAgentIp())) { @@ -256,6 +257,7 @@ public class AgentServiceImpl implements AgentService { fileEntity.setAgentIp(agentIp); fileEntity.setUuid(uuid); fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10)); + fileEntity.setTemplateId(sourceEntity.getId()); int op = getOp(fileEntity.getStatus()); int nextStatus = getNextStatus(fileEntity.getStatus()); fileEntity.setStatus(nextStatus); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java index 0c2e5c23a..1eaa99740 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java @@ -23,17 +23,20 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.source.file.FileSource; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceRequest; +import org.apache.inlong.manager.pojo.source.SubSourceDTO; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.service.source.AbstractSourceOperator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.stream.Collectors; /** * File source operator, such as get or set file source info. @@ -44,6 +47,9 @@ public class FileSourceOperator extends AbstractSourceOperator { @Autowired private ObjectMapper objectMapper; + @Autowired + private StreamSourceEntityMapper sourceMapper; + @Override public Boolean accept(String sourceType) { return SourceType.FILE.equals(sourceType); @@ -79,6 +85,14 @@ public class FileSourceOperator extends AbstractSourceOperator { List<StreamField> sourceFields = super.getSourceFields(entity.getId()); source.setFieldList(sourceFields); + + List<StreamSourceEntity> subSourceList = sourceMapper.selectByTemplateId(entity.getId()); + source.setSubSourceList(subSourceList.stream().map(subEntity -> SubSourceDTO.builder() + .id(subEntity.getId()) + .templateId(entity.getId()) + .agentIp(subEntity.getAgentIp()) + .status(subEntity.getStatus()).build()) + .collect(Collectors.toList())); return source; } diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 53662cf3f..2d7753dfb 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -381,6 +381,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', + `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to', `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 1ae2850fa..0c249a2ff 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -402,6 +402,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', + `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to', `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',