healchow commented on code in PR #5837: URL: https://github.com/apache/inlong/pull/5837#discussion_r966556356
########## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java: ########## @@ -90,4 +90,7 @@ public class SourceRequest { @ApiModelProperty("Other properties if needed") private Map<String, Object> properties = new LinkedHashMap<>(); + @ApiModelProperty("Sub source information of existing agents") Review Comment: Add `JsonIgnore` annotation. ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java: ########## @@ -78,13 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { @Transactional(rollbackFor = Throwable.class) public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) { StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new); - if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) { - if (request.getSourceType().equals(SourceType.AUTO_PUSH)) { - // auto push task needs not be issued to agent - entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode()); - } else { - entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode()); - } + if (request.getSourceType().equals(SourceType.AUTO_PUSH)) { Review Comment: Suggested changing to `SourceType.AUTO_PUSH.equals(request.getSourceType())`. ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java: ########## @@ -99,15 +99,11 @@ protected void operateStreamSources(String groupId, String streamId, String oper */ @SneakyThrows public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) { - // if a source has sub-sources, it is considered a template source. - // template sources do not need to be operated, its sub-sources will be processed in this method later. - if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) { - return false; - } for (int retry = 0; retry < 60; retry++) { int status = streamSource.getStatus(); SourceStatus sourceStatus = SourceStatus.forCode(status); - if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN) { Review Comment: Suggested adding a comment for the sub-sources need to be operate in some place. ########## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java: ########## @@ -215,15 +216,15 @@ private List<DataConfig> fetchFileTasks(TaskRequest taskRequest) { Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName), "both agent ip and cluster name are blank when fetching file task"); List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList, - Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10); + Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,UNLIMITED_FETCH_SIZE); Review Comment: Suggested removing the limit in SQL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org