This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e42f5ec276 [INLONG-9873][Manager] Support adding data add tasks for file collection (#9874) e42f5ec276 is described below commit e42f5ec27688089eb50ae48695c8fe39684eefb7 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Mar 28 21:42:59 2024 +0800 [INLONG-9873][Manager] Support adding data add tasks for file collection (#9874) * [INLONG-9873][Manager] Support adding supplementary recording tasks for file collection * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix code style * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error * [INLONG-9873][Manager] Fix error --- .../manager/client/api/InlongGroupContext.java | 2 +- .../manager/dao/entity/StreamSourceEntity.java | 4 +- .../dao/mapper/StreamSourceEntityMapper.java | 15 +++++--- .../resources/mappers/StreamSourceEntityMapper.xml | 37 +++++++++++------- .../{SubSourceDTO.java => DataAddTaskDTO.java} | 16 ++++---- .../manager/pojo/source/DataAddTaskRequest.java | 44 ++++++++++++++++++++++ .../inlong/manager/pojo/source/SourceRequest.java | 4 +- .../inlong/manager/pojo/source/StreamSource.java | 8 ++-- .../pojo/source/file/FileDataAddTaskRequest.java} | 27 ++++++++++--- .../manager/pojo/source/pulsar/PulsarSource.java | 3 ++ .../pojo/source/pulsar/PulsarSourceDTO.java | 3 ++ .../service/core/impl/AgentServiceImpl.java | 33 +++++++++++++--- .../source/AbstractSourceOperateListener.java | 2 +- .../listener/source/SourceRestartListener.java | 2 +- .../listener/source/SourceStopListener.java | 2 +- .../service/source/AbstractSourceOperator.java | 7 ++++ .../service/source/StreamSourceOperator.java | 10 +++++ .../service/source/StreamSourceService.java | 10 +++++ .../service/source/StreamSourceServiceImpl.java | 15 +++++++- .../service/source/file/FileSourceOperator.java | 43 +++++++++++++++++++-- .../manager/service/task/DataCleansingTask.java | 8 ++-- .../service/task/DeleteStreamSourceTask.java | 2 +- .../service/core/impl/AgentServiceTest.java | 8 ++-- .../main/resources/h2/apache_inlong_manager.sql | 4 +- .../manager-web/sql/apache_inlong_manager.sql | 4 +- inlong-manager/manager-web/sql/changes-1.12.0.sql | 3 ++ .../web/controller/StreamSourceController.java | 7 ++++ .../src/main/resources/application-dev.properties | 21 +++++++---- .../src/main/resources/application-prod.properties | 21 +++++++---- .../src/main/resources/application-test.properties | 21 +++++++---- 30 files changed, 294 insertions(+), 92 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java index 8600ecdafa..fe2c5ed04c 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java @@ -102,7 +102,7 @@ public class InlongGroupContext implements Serializable { StreamSource source = entry.getValue(); // when template id is null it is considered as normal source other than template source // sub sources are filtered because they are already collected in template source's sub source list - if (source != null && source.getTemplateId() == null) { + if (source != null && source.getTaskMapId() == null) { groupSources.add(source); } } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java index 4b28525333..b577fd48da 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java @@ -37,7 +37,7 @@ public class StreamSourceEntity implements Serializable { private String inlongStreamId; private String sourceType; private String sourceName; - private Integer templateId; + private Integer taskMapId; private String agentIp; private String uuid; @@ -74,7 +74,7 @@ public class StreamSourceEntity implements Serializable { + ", inlongStreamId='" + inlongStreamId + '\'' + ", sourceType='" + sourceType + '\'' + ", sourceName='" + sourceName + '\'' - + ", templateId=" + templateId + + ", templateId=" + taskMapId + ", agentIp='" + agentIp + '\'' + ", uuid='" + uuid + '\'' + ", dataNodeName='" + dataNodeName + '\'' diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index ca984fb04b..53d28bbbab 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -43,13 +43,13 @@ public interface StreamSourceEntityMapper { StreamSourceEntity selectForAgentTask(Integer id); /** - * Select one sub source by template id and agent ip. + * Select one data add task by task map id and agent ip. * - * @param templateId template id + * @param taskMapId template id * @param agentIp agent ip * @return stream source info */ - StreamSourceEntity selectOneByTemplatedIdAndAgentIp(@Param("templateId") Integer templateId, + StreamSourceEntity selectOneByTaskMapIdAndAgentIp(@Param("taskMapId") Integer taskMapId, @Param("agentIp") String agentIp); /** @@ -111,9 +111,9 @@ public interface StreamSourceEntityMapper { List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList") List<String> groupIdList); /** - * Select all sub sources by template id + * Select all data add task by task map id */ - List<StreamSourceEntity> selectByTemplateId(@Param("templateId") Integer templateId); + List<StreamSourceEntity> selectByTaskMapId(@Param("taskMapId") Integer taskMapId); /** * Get the distinct source type from the given groupId and streamId @@ -190,6 +190,11 @@ public interface StreamSourceEntityMapper { */ void updateStatusByDeleted(); + /** + * Logic delete the data add task by modifiy time + */ + void logicalDeleteByTimeout(@Param("retentionDays") Integer retentionDays); + int logicalDeleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId, @Param("status") Integer status); diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 59d97d8a7f..c1101ed2f6 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -26,7 +26,7 @@ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/> <result column="source_type" jdbcType="VARCHAR" property="sourceType"/> <result column="source_name" jdbcType="VARCHAR" property="sourceName"/> - <result column="template_id" jdbcType="INTEGER" property="templateId"/> + <result column="task_map_id" jdbcType="INTEGER" property="taskMapId"/> <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/> <result column="uuid" jdbcType="VARCHAR" property="uuid"/> <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/> @@ -47,7 +47,7 @@ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/> </resultMap> <sql id="Base_Column_List"> - id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid, + id, inlong_group_id, inlong_stream_id, source_type, source_name, task_map_id, agent_ip, uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time, data_time_zone, ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time </sql> @@ -55,13 +55,13 @@ <insert id="insert" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> insert into stream_source (inlong_group_id, inlong_stream_id, - source_type, source_name, template_id, agent_ip, + source_type, source_name, task_map_id, agent_ip, uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time, data_time_zone, ext_params, status, previous_status, creator, modifier) values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, - #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER}, + #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{taskMapId,jdbcType=INTEGER}, #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeGroup,jdbcType=VARCHAR}, #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},#{modifyTime,jdbcType=TIMESTAMP}, @@ -76,11 +76,11 @@ where id = #{id,jdbcType=INTEGER} and is_deleted = 0 </select> - <select id="selectByTemplateId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> + <select id="selectByTaskMapId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> select <include refid="Base_Column_List"/> from stream_source - where template_id = #{templateId,jdbcType=INTEGER} + where task_map_id = #{taskMapId,jdbcType=INTEGER} and is_deleted = 0 </select> <select id="selectByIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> @@ -98,11 +98,11 @@ where id = #{id,jdbcType=INTEGER} for update </select> - <select id="selectOneByTemplatedIdAndAgentIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> + <select id="selectOneByTaskMapIdAndAgentIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> select <include refid="Base_Column_List"/> from stream_source - where template_id = #{templateId,jdbcType=INTEGER} + where task_map_id = #{taskMapId,jdbcType=INTEGER} and agent_ip = #{agentIp, jdbcType=VARCHAR} and is_deleted = 0 limit 1 @@ -159,7 +159,7 @@ #{status} </foreach> </if> - and template_id is NULL + and task_map_id is NULL </where> <choose> <when test="request.orderField != null and request.orderField != '' and request.orderType != null and request.orderType != ''"> @@ -271,7 +271,7 @@ </foreach> </if> and agent_ip is NULL - and template_id is NULL + and task_map_id is NULL and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR} </where> </select> @@ -303,7 +303,7 @@ #{item} </foreach> </if> - and template_id is NULL + and task_map_id is NULL </where> </select> <select id="selectSourceType" resultType="java.lang.String"> @@ -549,7 +549,18 @@ and status not in (99, 201, 301) </where> </update> - + <update id="logicalDeleteByTimeout"> + update stream_source + <set> + is_deleted = id, + status = 99 + </set> + <where> + is_deleted = 0 + and task_map_id is not null + and modify_time <= DATE_ADD(NOW(), INTERVAL -#{retentionDays, jdbcType=INTEGER} DAY) + </where> + </update> <update id="logicalDeleteByRelatedId"> update stream_source <set> @@ -586,7 +597,7 @@ </set> where is_deleted = 0 and agent_ip = #{agentIp, jdbcType=VARCHAR} - and template_id is not null + and task_map_id is not null <if test="targetStatus != null"> and status = #{targetStatus, jdbcType=INTEGER} </if> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java similarity index 79% rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java index 50d46d6e8a..544627db34 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java @@ -30,29 +30,29 @@ import lombok.NoArgsConstructor; import javax.validation.constraints.NotNull; /** - * Sub source information data per agent + * Data add task information */ @Builder @AllArgsConstructor @NoArgsConstructor @Data -public class SubSourceDTO { +public class DataAddTaskDTO { @ApiModelProperty("stream source id") private Integer id; - @ApiModelProperty("Template source id this sub source belongs to") - private Integer templateId; + @ApiModelProperty("Main source id this data add task belongs to") + private Integer taskMapId; - @ApiModelProperty("Agent ip of sub source") + @ApiModelProperty("Agent ip of data add task") private String agentIp; - @ApiModelProperty("Status of sub source") + @ApiModelProperty("Status of data add task") private Integer status; - public static SubSourceDTO getFromJson(@NotNull String extParams) { + public static DataAddTaskDTO getFromJson(@NotNull String extParams) { try { - return JsonUtils.parseObject(extParams, SubSourceDTO.class); + return JsonUtils.parseObject(extParams, DataAddTaskDTO.class); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java new file mode 100644 index 0000000000..521551825a --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import org.hibernate.validator.constraints.Length; + +import javax.validation.constraints.NotBlank; + +/** + * Data add task information + */ +@Data +@ApiModel("Data add task request") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "sourceType") +public class DataAddTaskRequest { + + @ApiModelProperty(value = "Source ID") + private Integer sourceId; + + @ApiModelProperty("Source type, including: FILE, KAFKA, etc.") + @NotBlank(message = "sourceType cannot be blank") + @Length(min = 1, max = 20, message = "length must be between 1 and 20") + private String sourceType; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index 0bcfade77b..6a31f9039f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -118,7 +118,7 @@ public class SourceRequest { private Map<String, Object> properties = new LinkedHashMap<>(); @JsonIgnore - @ApiModelProperty("Sub source information of existing agents") - private List<SubSourceDTO> subSourceList; + @ApiModelProperty("Data add task information of existing agents") + private List<DataAddTaskDTO> dataAddTaskList; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java index 9793189d26..cc56ecb6a7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java @@ -111,11 +111,11 @@ public abstract class StreamSource extends StreamNode { @ApiModelProperty("Properties for source") private Map<String, Object> properties = new LinkedHashMap<>(); - @ApiModelProperty("Null if not a sub source") - private Integer templateId; + @ApiModelProperty("Null if not a data add task") + private Integer taskMapId; - @ApiModelProperty("Sub source information of existing agents") - private List<SubSourceDTO> subSourceList; + @ApiModelProperty("Data add task information of existing agents") + private List<DataAddTaskDTO> dataAddTaskList; @ApiModelProperty(value = "Whether to ignore the parse errors of field value, true as default") private Boolean ignoreParseError; diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java similarity index 51% copy from inlong-manager/manager-web/sql/changes-1.12.0.sql copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java index 82e8af66e5..bcf292c1f3 100644 --- a/inlong-manager/manager-web/sql/changes-1.12.0.sql +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java @@ -15,14 +15,29 @@ * limitations under the License. */ --- This is the SQL change file from version 1.9.0 to the current version 1.10.0. --- When upgrading to version 1.10.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module. +package org.apache.inlong.manager.pojo.source.file; -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; -USE `apache_inlong_manager`; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; -ALTER TABLE `stream_source` ADD COLUMN `data_time_zone` varchar(256) DEFAULT NULL COMMENT 'Data time zone'; +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = SourceType.FILE) +@ApiModel(value = "File data add task request") +public class FileDataAddTaskRequest extends DataAddTaskRequest { + @ApiModelProperty("Start time") + private Long startTime; + @ApiModelProperty("End time") + private Long endTime; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java index cdab4d59cf..8c14d8118b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java @@ -88,6 +88,9 @@ public class PulsarSource extends StreamSource { @Builder.Default private String wrapType = MessageWrapType.INLONG_MSG_V0.getName(); + @ApiModelProperty("Reset subscription time") + private Long resetTime; + public PulsarSource() { this.setSourceType(SourceType.PULSAR); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java index 6fc1751d72..6c0ba66208 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java @@ -80,6 +80,9 @@ public class PulsarSourceDTO { @ApiModelProperty(value = "The message body wrap wrap type, including: RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc") private String wrapType; + @ApiModelProperty("Reset subscription time") + private Long resetTime; + @ApiModelProperty("Properties for Pulsar") private Map<String, Object> properties; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 953df33e96..3fa86b02fe 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -136,10 +136,17 @@ public class AgentServiceImpl implements AgentService { private Integer beforeSeconds; @Value("${source.update.interval:60}") private Integer updateTaskInterval; - @Value("${source.cleansing.enabled:false}") + @Value("${source.clean.enabled:false}") private Boolean sourceCleanEnabled; - @Value("${source.cleansing.interval:600}") + @Value("${source.clean.interval.seconds:600}") private Integer cleanInterval; + @Value("${add.task.clean.enabled:false}") + private Boolean dataAddTaskCleanEnabled; + @Value("${add.task.clean.interval.seconds:10}") + private Integer dataAddTaskCleanInterval; + @Value("${add.task.retention.days:7}") + private Integer retentionDays; + @Autowired private StreamSourceEntityMapper sourceMapper; @Autowired @@ -202,6 +209,22 @@ public class AgentServiceImpl implements AgentService { }, 0, cleanInterval, TimeUnit.SECONDS); LOGGER.info("clean task started successfully"); } + if (dataAddTaskCleanEnabled) { + ThreadFactory factory = new ThreadFactoryBuilder() + .setNameFormat("scheduled-subSource-deleted-%d") + .setDaemon(true) + .build(); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(factory); + executor.scheduleWithFixedDelay(() -> { + try { + sourceMapper.logicalDeleteByTimeout(retentionDays); + LOGGER.info("clean sub task successfully"); + } catch (Throwable t) { + LOGGER.error("clean sub task error", t); + } + }, 0, dataAddTaskCleanInterval, TimeUnit.SECONDS); + LOGGER.info("clean sub task started successfully"); + } } @Override @@ -441,7 +464,7 @@ public class AgentServiceImpl implements AgentService { /** * Add subtasks to template tasks. - * (Template task are agent_ip is null and template_id is null) + * (Template task are agent_ip is null and task_map_id is null) */ private void preProcessTemplateFileTask(TaskRequest taskRequest) { List<Integer> needCopiedStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), @@ -463,7 +486,7 @@ public class AgentServiceImpl implements AgentService { if (groupEntity != null && noNeedAddTask.contains(GroupStatus.forCode(groupEntity.getStatus()))) { return; } - StreamSourceEntity subSource = sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(), + StreamSourceEntity subSource = sourceMapper.selectOneByTaskMapIdAndAgentIp(sourceEntity.getId(), agentIp); if (subSource == null) { InlongClusterNodeEntity clusterNodeEntity = selectByIpAndCluster(agentClusterName, agentIp); @@ -474,7 +497,7 @@ public class AgentServiceImpl implements AgentService { CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT)); - fileEntity.setTemplateId(sourceEntity.getId()); + fileEntity.setTaskMapId(sourceEntity.getId()); fileEntity.setAgentIp(agentIp); fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode()); // create new sub source task diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java index cb955decd9..ccefd5cf10 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java @@ -106,7 +106,7 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList // template sources are filtered and processed in corresponding subclass listeners if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_STOP || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT - || CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) { + || CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) { return true; } else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) { return false; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java index 7575d8c25a..238b6bc1bc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java @@ -51,7 +51,7 @@ public class SourceRestartListener extends AbstractSourceOperateListener { public void operateStreamSource(SourceRequest sourceRequest, String operator) { // if a source has sub-sources, it is considered a template source. // template sources do not need to be restarted, its sub-sources will be processed in this method later. - if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) { + if (CollectionUtils.isNotEmpty(sourceRequest.getDataAddTaskList())) { return; } streamSourceService.restart(sourceRequest.getId(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java index e3636dcb89..e1328bf6e2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java @@ -51,7 +51,7 @@ public class SourceStopListener extends AbstractSourceOperateListener { public void operateStreamSource(SourceRequest sourceRequest, String operator) { // if a source has sub-sources, it is considered a template source. // template sources do not need to be stopped, its sub-sources will be processed in this method later. - if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) { + if (CollectionUtils.isNotEmpty(sourceRequest.getDataAddTaskList())) { return; } streamSourceService.stop(sourceRequest.getId(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 00f85052fd..9428101c40 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -32,6 +32,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper; import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.stream.StreamField; @@ -338,4 +339,10 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { public void syncSourceFieldInfo(SourceRequest request, String operator) { LOGGER.info("not support sync source field info for type ={}", request.getSourceType()); } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public Integer addDataAddTask(DataAddTaskRequest request, String operator) { + throw new BusinessException(String.format("not support data add task for type =%s", request.getSourceType())); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index 5e7168879b..997f39b867 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; @@ -126,4 +127,13 @@ public interface StreamSourceOperator { */ void syncSourceFieldInfo(SourceRequest request, String operator); + /** + * Save the data add task info. + * + * @param request request of data add task + * @param operator name of operator + * @return source id after saving + */ + Integer addDataAddTask(DataAddTaskRequest request, String operator); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index 1bcb9f9966..0dd4decd28 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.source; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourcePageRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; @@ -229,4 +230,13 @@ public interface StreamSourceService { return true; } + /** + * Save the data add task information + * + * @param request Source request. + * @param operator Operator's name. + * @return source id after saving. + */ + Integer addDataAddTask(DataAddTaskRequest request, String operator); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index a8a01224b8..2d3855b05e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourcePageRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; @@ -384,7 +385,7 @@ public class StreamSourceServiceImpl implements StreamSourceService { StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id); Preconditions.expectNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage()); - boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id)); + boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id)); // Check if it can be delete InlongGroupEntity groupEntity = groupMapper.selectByGroupId(entity.getInlongGroupId()); @@ -436,7 +437,7 @@ public class StreamSourceServiceImpl implements StreamSourceService { String.format("InlongGroup does not exist with InlongGroupId=%s", entity.getInlongGroupId())); } // check record status - boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id)); + boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id)); SourceStatus curStatus = SourceStatus.forCode(entity.getStatus()); SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE; // if source is frozen|failed|new, or if it is a template source or auto push source, delete directly @@ -629,4 +630,14 @@ public class StreamSourceServiceImpl implements StreamSourceService { request.setInlongStreamId(entity.getInlongStreamId()); request.setSourceName(entity.getSourceName()); } + + @Override + public Integer addDataAddTask(DataAddTaskRequest request, String operator) { + LOGGER.info("begin to add data add task info: {}", request); + StreamSourceEntity entity = sourceMapper.selectById(request.getSourceId()); + StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); + int id = sourceOperator.addDataAddTask(request, operator); + LOGGER.info("success to add data add task info: {}", request); + return id; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java index 05b22ae51c..5d4329c7ff 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java @@ -23,9 +23,11 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; +import org.apache.inlong.manager.pojo.source.DataAddTaskDTO; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; -import org.apache.inlong.manager.pojo.source.SubSourceDTO; +import org.apache.inlong.manager.pojo.source.file.FileDataAddTaskRequest; import org.apache.inlong.manager.pojo.source.file.FileSource; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceRequest; @@ -33,8 +35,13 @@ import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.service.source.AbstractSourceOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.stream.Collectors; @@ -45,6 +52,8 @@ import java.util.stream.Collectors; @Service public class FileSourceOperator extends AbstractSourceOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceOperator.class); + @Autowired private ObjectMapper objectMapper; @@ -88,14 +97,40 @@ public class FileSourceOperator extends AbstractSourceOperator { List<StreamField> sourceFields = super.getSourceFields(entity.getId()); source.setFieldList(sourceFields); - List<StreamSourceEntity> subSourceList = sourceMapper.selectByTemplateId(entity.getId()); - source.setSubSourceList(subSourceList.stream().map(subEntity -> SubSourceDTO.builder() + List<StreamSourceEntity> dataAddTaskList = sourceMapper.selectByTaskMapId(entity.getId()); + source.setDataAddTaskList(dataAddTaskList.stream().map(subEntity -> DataAddTaskDTO.builder() .id(subEntity.getId()) - .templateId(entity.getId()) + .taskMapId(entity.getId()) .agentIp(subEntity.getAgentIp()) .status(subEntity.getStatus()).build()) .collect(Collectors.toList())); return source; } + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public Integer addDataAddTask(DataAddTaskRequest request, String operator) { + FileDataAddTaskRequest sourceRequest = (FileDataAddTaskRequest) request; + StreamSourceEntity sourceEntity = sourceMapper.selectById(request.getSourceId()); + try { + List<StreamSourceEntity> dataAddTaskList = sourceMapper.selectByTaskMapId(sourceEntity.getId()); + int dataAddTaskSize = CollectionUtils.isNotEmpty(dataAddTaskList) ? dataAddTaskList.size() : 0; + FileSourceDTO dto = FileSourceDTO.getFromJson(sourceEntity.getExtParams()); + dto.setStartTime(sourceRequest.getStartTime()); + dto.setEndTime(sourceRequest.getEndTime()); + dto.setRetry(true); + StreamSourceEntity dataAddTaskEntity = + CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); + dataAddTaskEntity.setId(null); + dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1)); + dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto)); + dataAddTaskEntity.setTaskMapId(sourceEntity.getId()); + return sourceMapper.insert(dataAddTaskEntity); + } catch (Exception e) { + LOGGER.error("serialize extParams of File SourceDTO failure: ", e); + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("serialize extParams of File SourceDTO failure: %s", e.getMessage())); + } + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java index 871700c5b3..2758c80ce8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java @@ -59,13 +59,13 @@ public class DataCleansingTask extends TimerTask implements InitializingBean { */ private static final int INITIAL_DELAY = 60; - @Value("${data.cleansing.enabled:false}") + @Value("${data.clean.enabled:false}") private Boolean enabled; - @Value("${data.cleansing.interval.seconds:1800}") + @Value("${data.clean.interval.seconds:1800}") private Integer interval; - @Value("${data.cleansing.before.days:10}") + @Value("${data.clean.before.days:10}") private Integer before; - @Value("${data.cleansing.batchSize:100}") + @Value("${data.clean.batchSize:100}") private Integer batchSize; @Autowired diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java index 9d3a83dff9..5e4b4e724e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java @@ -56,7 +56,7 @@ public class DeleteStreamSourceTask extends TimerTask implements InitializingBea @Value("${group.deleted.enabled:false}") private Boolean enabled; - @Value("${group.deleted.batchSize:100}") + @Value("${group.deleted.batch.size:100}") private Integer batchSize; @Value("${group.deleted.latest.hours:10}") private Integer latestHours; diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java index 70034803e8..41acbdfd86 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java @@ -141,7 +141,7 @@ class AgentServiceTest extends ServiceBaseTest { public void suspendSource(String groupId, String streamId) { List<StreamSource> sources = sourceService.listSource(groupId, streamId); sources.stream() - .filter(source -> source.getTemplateId() != null) + .filter(source -> source.getTaskMapId() != null) .forEach(source -> sourceService.stop(source.getId(), GLOBAL_OPERATOR)); groupMapper.updateStatus(groupId, GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR); streamMapper.updateStatusByIdentifier(groupId, streamId, StreamStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR); @@ -153,7 +153,7 @@ class AgentServiceTest extends ServiceBaseTest { public void restartSource(String groupId, String streamId) { List<StreamSource> sources = sourceService.listSource(groupId, streamId); sources.stream() - .filter(source -> source.getTemplateId() != null) + .filter(source -> source.getTaskMapId() != null) .forEach(source -> sourceService.restart(source.getId(), GLOBAL_OPERATOR)); groupMapper.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR); streamMapper.updateStatusByIdentifier(groupId, streamId, StreamStatus.RESTARTED.getCode(), GLOBAL_OPERATOR); @@ -230,7 +230,7 @@ class AgentServiceTest extends ServiceBaseTest { agent.pullTask(); // report last success status final int sourceId = sourceService.listSource(groupStream.getLeft(), groupStream.getRight()).stream() - .filter(source -> source.getTemplateId() != null) + .filter(source -> source.getTaskMapId() != null) .findAny() .get() .getId(); @@ -256,7 +256,7 @@ class AgentServiceTest extends ServiceBaseTest { // update group to config success final String groupId = sourceService.listSource(groupStream.getLeft(), groupStream.getRight()).stream() - .filter(source -> source.getTemplateId() != null) + .filter(source -> source.getTaskMapId() != null) .findAny() .get() .getInlongGroupId(); diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index edd9abf88d..1a2d28ad2f 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -331,7 +331,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', - `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to', + `task_map_id` int(11) DEFAULT NULL COMMENT 'Id of the task this agent belongs to', `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', @@ -354,7 +354,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`), INDEX `source_status_index` (`status`, `is_deleted`), INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`), - INDEX `source_template_id_index` (`template_id`) + INDEX `source_task_map_id_index` (`task_map_id`) ); -- ---------------------------- diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 025b53190e..5451516614 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -351,7 +351,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', - `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to', + `task_map_id` int(11) DEFAULT NULL COMMENT 'Id of the task this agent belongs to', `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', @@ -374,7 +374,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`), INDEX `source_status_index` (`status`, `is_deleted`), INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`), - INDEX `source_template_id_index` (`template_id`) + INDEX `source_task_map_id_index` (`task_map_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table'; diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql b/inlong-manager/manager-web/sql/changes-1.12.0.sql index 82e8af66e5..a92011164f 100644 --- a/inlong-manager/manager-web/sql/changes-1.12.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.12.0.sql @@ -24,5 +24,8 @@ SET FOREIGN_KEY_CHECKS = 0; USE `apache_inlong_manager`; ALTER TABLE `stream_source` ADD COLUMN `data_time_zone` varchar(256) DEFAULT NULL COMMENT 'Data time zone'; +DROP INDEX `source_template_id_index` ON `stream_source`; +CREATE INDEX source_task_map_id_index ON `stream_source` (`task_map_id`); +ALTER TABLE `stream_source` CHANGE template_id task_map_id int(11) DEFAULT NULL COMMENT 'Id of the task this agent belongs to'; diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index f0cbbd56da..8e7645f993 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.validation.SaveValidation; import org.apache.inlong.manager.common.validation.UpdateValidation; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourcePageRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; @@ -118,4 +119,10 @@ public class StreamSourceController { sourceService.forceDelete(inlongGroupId, inlongStreamId, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/source/addDataAddTask", method = RequestMethod.POST) + @ApiOperation(value = "Add supplementary recording task for stream source") + public Response<Integer> addSub(@RequestBody DataAddTaskRequest request) { + return Response.success(sourceService.addDataAddTask(request, LoginUserUtils.getLoginUser().getName())); + } + } diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 0bcebdc06e..8a9032d5ec 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -73,15 +73,15 @@ audit.ck.username=default # ClickHouse password audit.ck.password= -# Database cleansing +# Database clean # If turned on, logically deleted data will be collected and permanently deleted periodically -data.cleansing.enabled=false +data.clean.enabled=false # The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour) -data.cleansing.interval.seconds=1800 +data.clean.interval.seconds=1800 # Select the data whose latest modify time is some days before, default is 10 days -data.cleansing.before.days=10 +data.clean.before.days=10 # The maximum size of data to be deleted in batch, default is 100 -data.cleansing.batchSize=100 +data.clean.batchSize=100 # Whether to use ZooKeeper to manage the Sort task config, default is false, which means not using ZooKeeper sort.enable.zookeeper=false @@ -97,14 +97,19 @@ source.update.enabled=false source.update.before.seconds=60 source.update.interval=60 +# If turned on, regularly clear expired data add tasks +add.task.clean.enabled=false +add.task.clean.interval.seconds=10 +add.task.retention.days=7 + # If turned on, tasks in the incorrect state are periodically deleted -source.cleansing.enabled=false -source.cleansing.interval=600 +source.clean.enabled=false +source.clean.interval.seconds=600 # Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours group.deleted.latest.hours=10 # The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources. -group.deleted.batchSize=100 +group.deleted.batch.size=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index a9c55b39b3..835822bf84 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -72,15 +72,15 @@ audit.ck.username=default # ClickHouse password audit.ck.password= -# Database cleansing +# Database clean # If turned on, logically deleted data will be collected and permanently deleted periodically -data.cleansing.enabled=false +data.clean.enabled=false # The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour) -data.cleansing.interval.seconds=1800 +data.clean.interval.seconds=1800 # Select the data whose latest modify time is some days before, default is 10 days -data.cleansing.before.days=10 +data.clean.before.days=10 # The maximum size of data to be deleted in batch, default is 100 -data.cleansing.batchSize=100 +data.clean.batchSize=100 # Whether to use ZooKeeper to manage the Sort task config, default is false, which means not using ZooKeeper sort.enable.zookeeper=false @@ -96,14 +96,19 @@ source.update.enabled=false source.update.before.seconds=60 source.update.interval=60 +# If turned on, regularly clear expired data add tasks +add.task.clean.enabled=false +add.task.clean.interval.seconds=10 +add.task.retention.days=7 + # If turned on, tasks in the incorrect state are periodically deleted -source.cleansing.enabled=false -source.cleansing.interval=600 +source.clean.enabled=false +source.clean.interval.seconds=600 # Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours group.deleted.latest.hours=10 # The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources. -group.deleted.batchSize=100 +group.deleted.batch.size=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 0bcebdc06e..8a9032d5ec 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -73,15 +73,15 @@ audit.ck.username=default # ClickHouse password audit.ck.password= -# Database cleansing +# Database clean # If turned on, logically deleted data will be collected and permanently deleted periodically -data.cleansing.enabled=false +data.clean.enabled=false # The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour) -data.cleansing.interval.seconds=1800 +data.clean.interval.seconds=1800 # Select the data whose latest modify time is some days before, default is 10 days -data.cleansing.before.days=10 +data.clean.before.days=10 # The maximum size of data to be deleted in batch, default is 100 -data.cleansing.batchSize=100 +data.clean.batchSize=100 # Whether to use ZooKeeper to manage the Sort task config, default is false, which means not using ZooKeeper sort.enable.zookeeper=false @@ -97,14 +97,19 @@ source.update.enabled=false source.update.before.seconds=60 source.update.interval=60 +# If turned on, regularly clear expired data add tasks +add.task.clean.enabled=false +add.task.clean.interval.seconds=10 +add.task.retention.days=7 + # If turned on, tasks in the incorrect state are periodically deleted -source.cleansing.enabled=false -source.cleansing.interval=600 +source.clean.enabled=false +source.clean.interval.seconds=600 # Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours group.deleted.latest.hours=10 # The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources. -group.deleted.batchSize=100 +group.deleted.batch.size=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false