This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ab23b608e [INLONG-5836][Manager] Fix the source status related behavior problems (#5837) ab23b608e is described below commit ab23b608e8faa9c1127a3766972b3e8b63c5c056 Author: woofyzhao <490467...@qq.com> AuthorDate: Fri Sep 9 12:45:51 2022 +0800 [INLONG-5836][Manager] Fix the source status related behavior problems (#5837) --- .../inlong/manager/common/enums/SourceStatus.java | 54 +++++++++++----------- .../dao/mapper/StreamSourceEntityMapper.java | 2 +- .../resources/mappers/StreamSourceEntityMapper.xml | 2 +- .../inlong/manager/pojo/source/SourceRequest.java | 5 ++ .../service/core/impl/AgentServiceImpl.java | 6 +-- .../source/AbstractSourceOperateListener.java | 9 ++-- .../listener/source/SourceRestartListener.java | 6 +++ .../listener/source/SourceStopListener.java | 6 +++ .../service/source/AbstractSourceOperator.java | 24 ++++++---- .../service/source/StreamSourceServiceImpl.java | 7 ++- 10 files changed, 71 insertions(+), 50 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java index 1561af0ec..5d5182d41 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java @@ -104,33 +104,33 @@ public enum SourceStatus { SOURCE_STATE_AUTOMATON.put(SOURCE_FROZEN, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE)); // [xxx] bo be issued - HashSet<SourceStatus> tobeAdd = Sets.newHashSet(BEEN_ISSUED_ADD); - tobeAdd.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAdd); - HashSet<SourceStatus> tobeDelete = Sets.newHashSet(BEEN_ISSUED_DELETE); - tobeDelete.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDelete)); - HashSet<SourceStatus> tobeRetry = Sets.newHashSet(BEEN_ISSUED_RETRY); - tobeRetry.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetry)); - HashSet<SourceStatus> tobeBacktrack = Sets.newHashSet(BEEN_ISSUED_BACKTRACK); - tobeBacktrack.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrack)); - HashSet<SourceStatus> tobeFrozen = Sets.newHashSet(BEEN_ISSUED_FROZEN); - tobeFrozen.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozen)); - HashSet<SourceStatus> tobeActive = Sets.newHashSet(BEEN_ISSUED_ACTIVE); - tobeActive.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActive)); - HashSet<SourceStatus> tobeCheck = Sets.newHashSet(BEEN_ISSUED_CHECK); - tobeCheck.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheck)); - HashSet<SourceStatus> tobeRedoMetric = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC); - tobeRedoMetric.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetric)); - HashSet<SourceStatus> tobeMakeup = Sets.newHashSet(BEEN_ISSUED_MAKEUP); - tobeMakeup.addAll(TOBE_ISSUED_SET); - SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeup)); + HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE); + tobeAddNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext); + HashSet<SourceStatus> tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE); + tobeDeleteNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDeleteNext)); + HashSet<SourceStatus> tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY); + tobeRetryNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetryNext)); + HashSet<SourceStatus> tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK); + tobeBacktrackNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrackNext)); + HashSet<SourceStatus> tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_FROZEN); + tobeFrozenNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozenNext)); + HashSet<SourceStatus> tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE); + tobeActiveNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActiveNext)); + HashSet<SourceStatus> tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK); + tobeCheckNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheckNext)); + HashSet<SourceStatus> tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC); + tobeRedoMetricNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetricNext)); + HashSet<SourceStatus> tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP); + tobeMakeupNext.addAll(TOBE_ISSUED_SET); + SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeupNext)); // [xxx] been issued SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED)); diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index 3b0144d3d..8cc63e4e3 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -78,7 +78,7 @@ public interface StreamSourceEntityMapper { */ List<StreamSourceEntity> selectByAgentIpOrCluster(@Param("statusList") List<Integer> statusList, @Param("sourceTypeList") List<String> sourceTypeList, @Param("agentIp") String agentIp, - @Param("clusterName") String clusterName, @Param("limit") int limit); + @Param("clusterName") String clusterName); /** * Query the sources with status 20x by the given agent IP and agent UUID. diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index c1b955f57..07bb4e8d4 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -222,7 +222,6 @@ </foreach> </if> and (agent_ip = #{agentIp, jdbcType=VARCHAR} or inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}) - limit #{limit, jdbcType=INTEGER} </where> </select> <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> @@ -372,6 +371,7 @@ <if test="streamId != null"> and inlong_stream_id = #{streamId, jdbcType=VARCHAR} </if> + and source_type != 'AUTO_PUSH' </where> </update> <update id="updateIpAndUuid"> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java index 42735478e..46b7c338d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.pojo.source; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -90,4 +91,8 @@ public class SourceRequest { @ApiModelProperty("Other properties if needed") private Map<String, Object> properties = new LinkedHashMap<>(); + @JsonIgnore + @ApiModelProperty("Sub source information of existing agents") + private List<SubSourceDTO> subSourceList; + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 91b3f7238..5e56c385d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -215,15 +215,15 @@ public class AgentServiceImpl implements AgentService { Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName), "both agent ip and cluster name are blank when fetching file task"); List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList, - Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10); + Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName); List<DataConfig> fileTasks = Lists.newArrayList(); for (StreamSourceEntity sourceEntity : sourceEntities) { FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(sourceEntity.getExtParams()); final String destIp = sourceEntity.getAgentIp(); final String destClusterName = sourceEntity.getInlongClusterName(); - // Cluster name is blank - if (StringUtils.isNotBlank(destIp) && StringUtils.isBlank(destClusterName)) { + // Use ip directly if it is not empty + if (StringUtils.isNotBlank(destIp)) { if (destIp.equals(agentIp)) { int op = getOp(sourceEntity.getStatus()); int nextStatus = getNextStatus(sourceEntity.getStatus()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java index c07c86905..01eca1ba7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java @@ -99,15 +99,12 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList */ @SneakyThrows public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) { - // if a source has sub-sources, it is considered a template source. - // template sources do not need to be operated, its sub-sources will be processed in this method later. - if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) { - return false; - } for (int retry = 0; retry < 60; retry++) { int status = streamSource.getStatus(); SourceStatus sourceStatus = SourceStatus.forCode(status); - if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN) { + // template sources are filtered and processed in corresponding subclass listeners + if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN + || CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) { return true; } else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) { return false; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java index ec7170f48..54229273d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.source; +import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.pojo.source.SourceRequest; @@ -47,6 +48,11 @@ public class SourceRestartListener extends AbstractSourceOperateListener { @Override public void operateStreamSource(SourceRequest sourceRequest, String operator) { + // if a source has sub-sources, it is considered a template source. + // template sources do not need to be restarted, its sub-sources will be processed in this method later. + if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) { + return; + } streamSourceService.restart(sourceRequest.getId(), operator); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java index 0ccdbc5b3..7e2aa08f8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.listener.source; +import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.pojo.source.SourceRequest; @@ -47,6 +48,11 @@ public class SourceStopListener extends AbstractSourceOperateListener { @Override public void operateStreamSource(SourceRequest sourceRequest, String operator) { + // if a source has sub-sources, it is considered a template source. + // template sources do not need to be stopped, its sub-sources will be processed in this method later. + if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) { + return; + } streamSourceService.stop(sourceRequest.getId(), operator); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 096a57deb..0d4a3f0b3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -78,13 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { @Transactional(rollbackFor = Throwable.class) public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) { StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new); - if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { - if (request.getSourceType().equals(SourceType.AUTO_PUSH)) { - // auto push task needs not be issued to agent - entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode()); - } else { - entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode()); - } + if (SourceType.AUTO_PUSH.equals(request.getSourceType())) { + // auto push task needs not be issued to agent + entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode()); + } else if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { + entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode()); } else { entity.setStatus(SourceStatus.SOURCE_NEW.getCode()); } @@ -121,6 +119,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { public void updateOpt(SourceRequest request, Integer groupStatus, String operator) { StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId()); Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage()); + + if (SourceType.AUTO_PUSH.equals(entity.getSourceType())) { + LOGGER.warn("auto push source {} can not be updated", entity.getSourceName()); + return; + } if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) { throw new BusinessException(String.format("source=%s is not allowed to update, " + "please wait until its changed to final status or stop / frozen / delete it firstly", entity)); @@ -132,7 +135,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } - // Source type cannot be changed + // source type cannot be changed if (!Objects.equals(entity.getSourceType(), request.getSourceType())) { throw new BusinessException(String.format("source type=%s cannot change to %s", entity.getSourceType(), request.getSourceType())); @@ -150,11 +153,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { } } - // Setting updated parameters of stream source entity. + // setting updated parameters of stream source entity. setTargetEntity(request, entity); entity.setModifier(operator); - // Re-issue task if necessary + // re-issue task if necessary entity.setPreviousStatus(entity.getStatus()); if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode()); @@ -171,6 +174,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { break; } } + int rowCount = sourceMapper.updateByPrimaryKeySelective(entity); if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { LOGGER.warn(errMsg); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 7ef31f223..4bbc65b89 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -24,6 +24,7 @@ import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.SourceStatus; @@ -237,12 +238,14 @@ public class StreamSourceServiceImpl implements StreamSourceService { StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id); Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage()); + boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id)); SourceStatus curStatus = SourceStatus.forCode(entity.getStatus()); SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE; - // if source is frozen|failed|new , delete directly + // if source is frozen|failed|new, or if it is a template source or auto push source, delete directly if (curStatus == SourceStatus.SOURCE_FROZEN || curStatus == SourceStatus.SOURCE_FAILED - || curStatus == SourceStatus.SOURCE_NEW) { + || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource + || SourceType.AUTO_PUSH.equals(entity.getSourceType())) { nextStatus = SourceStatus.SOURCE_DISABLE; } if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {